Skip to main content

nodedb_query/msgpack_scan/
aggregate_helpers.rs

1//! Public helpers for streaming aggregate accumulators.
2//!
3//! These thin wrappers expose field-extraction primitives used by the
4//! `handlers/aggregate.rs` streaming accumulator path in the `nodedb` crate.
5//! Each function operates on a single raw MessagePack document byte slice and
6//! returns only the scalar value needed by the calling accumulator — no
7//! document bytes are retained after the call returns.
8
9use nodedb_types::Value;
10
11use crate::expr::SqlExpr;
12use crate::msgpack_scan::field::extract_field;
13use crate::msgpack_scan::reader::{read_f64, read_str, read_value};
14use crate::value_ops;
15
16// ── Expression evaluator ───────────────────────────────────────────────────
17
18#[inline]
19fn eval_expr(doc: &[u8], expr: &SqlExpr) -> Option<Value> {
20    let doc_val = nodedb_types::json_msgpack::value_from_msgpack(doc).ok()?;
21    Some(expr.eval(&doc_val))
22}
23
24// ── Public extraction helpers ──────────────────────────────────────────────
25
26/// Extract a numeric (f64) value from `field`, or evaluate `expr` if provided.
27/// Returns `None` when the field is absent or cannot be converted to f64.
28#[inline]
29pub fn extract_f64(doc: &[u8], field: &str, expr: Option<&SqlExpr>) -> Option<f64> {
30    if let Some(expr) = expr {
31        return value_ops::value_to_f64(&eval_expr(doc, expr)?, false);
32    }
33    let (start, _end) = extract_field(doc, 0, field)?;
34    read_f64(doc, start)
35}
36
37/// Extract a display string from `field`, or evaluate `expr` if provided.
38/// Returns `None` when the field is absent.
39pub fn extract_str(doc: &[u8], field: &str, expr: Option<&SqlExpr>) -> Option<String> {
40    if let Some(expr) = expr {
41        return Some(value_ops::value_to_display_string(&eval_expr(doc, expr)?));
42    }
43    let (start, _end) = extract_field(doc, 0, field)?;
44    read_str(doc, start).map(|s| s.to_string())
45}
46
47/// Extract a field as `Value`.  Uses direct msgpack→Value for scalars;
48/// falls back to full document decode only for complex types.
49pub fn extract_value(doc: &[u8], field: &str, expr: Option<&SqlExpr>) -> Option<Value> {
50    if let Some(expr) = expr {
51        return eval_expr(doc, expr);
52    }
53    let (start, end) = extract_field(doc, 0, field)?;
54    if let Some(v) = read_value(doc, start) {
55        return Some(v);
56    }
57    let field_bytes = &doc[start..end];
58    nodedb_types::json_msgpack::value_from_msgpack(field_bytes).ok()
59}
60
61/// Extract a field or expression result as raw msgpack bytes.
62/// Used by `count_distinct`, `approx_count_distinct`, `approx_topk`, etc.
63pub fn extract_bytes(doc: &[u8], field: &str, expr: Option<&SqlExpr>) -> Option<Vec<u8>> {
64    if let Some(expr) = expr {
65        let val = eval_expr(doc, expr)?;
66        return nodedb_types::json_msgpack::value_to_msgpack(&val).ok();
67    }
68    let (start, end) = extract_field(doc, 0, field)?;
69    Some(doc[start..end].to_vec())
70}
71
72/// Returns `Some(())` when the field is present and non-null.
73/// Used by `count(field)` accumulator to count non-null values.
74#[inline]
75pub fn extract_non_null(doc: &[u8], field: &str, expr: Option<&SqlExpr>) -> Option<()> {
76    let v = extract_value(doc, field, expr)?;
77    if v.is_null() { None } else { Some(()) }
78}