Skip to main content

nodedb_query/msgpack_scan/
aggregate_helpers.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Public helpers for streaming aggregate accumulators.
4//!
5//! These thin wrappers expose field-extraction primitives used by the
6//! `handlers/aggregate.rs` streaming accumulator path in the `nodedb` crate.
7//! Each function operates on a single raw MessagePack document byte slice and
8//! returns only the scalar value needed by the calling accumulator — no
9//! document bytes are retained after the call returns.
10
11use nodedb_types::Value;
12
13use crate::expr::SqlExpr;
14use crate::msgpack_scan::field::extract_field;
15use crate::msgpack_scan::reader::{read_f64, read_str, read_value};
16use crate::value_ops;
17
18// ── Expression evaluator ───────────────────────────────────────────────────
19
20#[inline]
21fn eval_expr(doc: &[u8], expr: &SqlExpr) -> Option<Value> {
22    let doc_val = nodedb_types::json_msgpack::value_from_msgpack(doc).ok()?;
23    Some(expr.eval(&doc_val))
24}
25
26// ── Public extraction helpers ──────────────────────────────────────────────
27
28/// Extract a numeric (f64) value from `field`, or evaluate `expr` if provided.
29/// Returns `None` when the field is absent or cannot be converted to f64.
30#[inline]
31pub fn extract_f64(doc: &[u8], field: &str, expr: Option<&SqlExpr>) -> Option<f64> {
32    if let Some(expr) = expr {
33        return value_ops::value_to_f64(&eval_expr(doc, expr)?, false);
34    }
35    let (start, _end) = extract_field(doc, 0, field)?;
36    read_f64(doc, start)
37}
38
39/// Extract a display string from `field`, or evaluate `expr` if provided.
40/// Returns `None` when the field is absent.
41pub fn extract_str(doc: &[u8], field: &str, expr: Option<&SqlExpr>) -> Option<String> {
42    if let Some(expr) = expr {
43        return Some(value_ops::value_to_display_string(&eval_expr(doc, expr)?));
44    }
45    let (start, _end) = extract_field(doc, 0, field)?;
46    read_str(doc, start).map(|s| s.to_string())
47}
48
49/// Extract a field as `Value`.  Uses direct msgpack→Value for scalars;
50/// falls back to full document decode only for complex types.
51pub fn extract_value(doc: &[u8], field: &str, expr: Option<&SqlExpr>) -> Option<Value> {
52    if let Some(expr) = expr {
53        return eval_expr(doc, expr);
54    }
55    let (start, end) = extract_field(doc, 0, field)?;
56    if let Some(v) = read_value(doc, start) {
57        return Some(v);
58    }
59    let field_bytes = &doc[start..end];
60    nodedb_types::json_msgpack::value_from_msgpack(field_bytes).ok()
61}
62
63/// Extract a field or expression result as raw msgpack bytes.
64/// Used by `count_distinct`, `approx_count_distinct`, `approx_topk`, etc.
65pub fn extract_bytes(doc: &[u8], field: &str, expr: Option<&SqlExpr>) -> Option<Vec<u8>> {
66    if let Some(expr) = expr {
67        let val = eval_expr(doc, expr)?;
68        return nodedb_types::json_msgpack::value_to_msgpack(&val).ok();
69    }
70    let (start, end) = extract_field(doc, 0, field)?;
71    Some(doc[start..end].to_vec())
72}
73
74/// Returns `Some(())` when the field is present and non-null.
75/// Used by `count(field)` accumulator to count non-null values.
76#[inline]
77pub fn extract_non_null(doc: &[u8], field: &str, expr: Option<&SqlExpr>) -> Option<()> {
78    let v = extract_value(doc, field, expr)?;
79    if v.is_null() { None } else { Some(()) }
80}