Skip to main content

josie_core/
runtime.rs

1//! Runtime evaluator and operator registry for JOSIE tree expressions.
2//!
3//! This module defines:
4//! - canonical runtime state ([`State`])
5//! - execution context ([`Context`])
6//! - operator table ([`Operators`])
7//! - recursive evaluator ([`evaluate`])
8//!
9//! Operator names here are the canonical core surface used by web/cli adapters.
10
11use regex::RegexBuilder;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map, Value, json};
14use std::collections::{HashMap, HashSet};
15
16/// Canonical JSON node type accepted by the evaluator.
17pub type JsonNode = Value;
18/// Standard evaluator return type.
19pub type EvalResult = Result<Value, EvalError>;
20
21/// Runtime evaluation error.
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct EvalError {
24    pub message: String,
25}
26
27impl EvalError {
28    /// Create a new runtime evaluation error.
29    pub fn new(msg: impl Into<String>) -> Self {
30        Self {
31            message: msg.into(),
32        }
33    }
34}
35
36/// Runtime-defined function metadata for `def` / `call`.
37#[derive(Debug, Clone)]
38pub struct FunctionDef {
39    pub params: Vec<String>,
40    pub body: Value,
41    pub memoizable: bool,
42}
43
44/// Runtime state split into `server` and `client` scopes.
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct State {
47    pub server: Map<String, Value>,
48    pub client: Map<String, Value>,
49    #[serde(skip)]
50    pub fns: HashMap<String, FunctionDef>,
51    #[serde(skip)]
52    pub call_locals: Vec<HashMap<String, Value>>,
53    #[serde(skip)]
54    pub call_memo: HashMap<String, HashMap<String, Value>>,
55}
56
57impl State {
58    /// Create an empty runtime state.
59    pub fn new() -> Self {
60        Self {
61            server: Map::new(),
62            client: Map::new(),
63            fns: HashMap::new(),
64            call_locals: Vec::new(),
65            call_memo: HashMap::new(),
66        }
67    }
68}
69
70impl Default for State {
71    fn default() -> Self {
72        Self::new()
73    }
74}
75
76/// Event context used by `w.event.*` operators.
77#[derive(Debug, Clone)]
78pub struct EventContext {
79    pub value: Value,
80    pub key: Value,
81    pub prevent: bool,
82}
83
84impl Default for EventContext {
85    fn default() -> Self {
86        Self {
87            value: Value::Null,
88            key: Value::Null,
89            prevent: false,
90        }
91    }
92}
93
94/// Mutable execution context passed to every operator call.
95pub struct Context<'a> {
96    pub state: &'a mut State,
97    pub operators: &'a Operators,
98    pub event: Option<&'a EventContext>,
99}
100
101/// Operator function signature used by the registry.
102pub type Operator = fn(args: &[Value], ctx: &mut Context) -> EvalResult;
103
104/// Canonical operator registry.
105///
106/// Use [`Operators::register`] to add host-specific operators (typically `x.*`).
107pub struct Operators {
108    ops: HashMap<String, Operator>,
109}
110
111impl Operators {
112    /// Build the default canonical operator registry.
113    pub fn new() -> Self {
114        let mut ops: HashMap<String, Operator> = HashMap::new();
115        ops.insert("var".into(), op_var as Operator);
116        ops.insert("set".into(), op_set as Operator);
117        ops.insert("+".into(), op_add as Operator);
118        ops.insert("-".into(), op_sub as Operator);
119        ops.insert("*".into(), op_mul as Operator);
120        ops.insert("/".into(), op_div as Operator);
121        ops.insert("%".into(), op_mod as Operator);
122        ops.insert("==".into(), op_eq as Operator);
123        ops.insert("!=".into(), op_neq as Operator);
124        ops.insert(">".into(), op_gt as Operator);
125        ops.insert("<".into(), op_lt as Operator);
126        ops.insert(">=".into(), op_gte as Operator);
127        ops.insert("<=".into(), op_lte as Operator);
128        ops.insert("if".into(), op_if as Operator);
129        ops.insert("&&".into(), op_and as Operator);
130        ops.insert("||".into(), op_or as Operator);
131        ops.insert("!".into(), op_not as Operator);
132        ops.insert("util.concat".into(), op_concat as Operator);
133        ops.insert("util.lower".into(), op_lower as Operator);
134        ops.insert("util.upper".into(), op_upper as Operator);
135        ops.insert("util.contains".into(), op_contains as Operator);
136        ops.insert("util.template".into(), op_template as Operator);
137        ops.insert("util.to_int".into(), op_to_int as Operator);
138        ops.insert("util.to_float".into(), op_to_float as Operator);
139        ops.insert("util.to_string".into(), op_to_string as Operator);
140        ops.insert("util.trim".into(), op_trim as Operator);
141        ops.insert("util.str_len".into(), op_str_len as Operator);
142        ops.insert("util.get_path".into(), op_util_get_path as Operator);
143        ops.insert("util.one_of".into(), op_util_one_of as Operator);
144        ops.insert("util.pick".into(), op_util_pick as Operator);
145        ops.insert("util.exists".into(), op_util_exists as Operator);
146        ops.insert("util.omit".into(), op_util_omit as Operator);
147        ops.insert("util.as_array".into(), op_util_as_array as Operator);
148        ops.insert("util.to_bool".into(), op_util_to_bool as Operator);
149        ops.insert("util.replace".into(), op_util_replace as Operator);
150        ops.insert("util.replace_all".into(), op_util_replace_all as Operator);
151        ops.insert("util.split".into(), op_util_split as Operator);
152        ops.insert("util.join".into(), op_util_join as Operator);
153        ops.insert("util.regex_match".into(), op_util_regex_match as Operator);
154        ops.insert("util.regex_find".into(), op_util_regex_find as Operator);
155        ops.insert("util.regex_find_all".into(), op_util_regex_find_all as Operator);
156        ops.insert("util.regex_replace".into(), op_util_regex_replace as Operator);
157        ops.insert("log".into(), op_log as Operator);
158        ops.insert("len".into(), op_len as Operator);
159        ops.insert("push".into(), op_push as Operator);
160        ops.insert("get".into(), op_get as Operator);
161        ops.insert("data.select".into(), op_data_select as Operator);
162        ops.insert("data.rename".into(), op_data_rename as Operator);
163        ops.insert("data.cast".into(), op_data_cast as Operator);
164        ops.insert("data.chunk".into(), op_data_chunk as Operator);
165        ops.insert("data.flat_map".into(), op_data_flat_map as Operator);
166        ops.insert("data.distinct".into(), op_data_distinct as Operator);
167        ops.insert("data.sort_by".into(), op_data_sort_by as Operator);
168        ops.insert("data.group_by".into(), op_data_group_by as Operator);
169        ops.insert("data.aggregate".into(), op_data_aggregate as Operator);
170        ops.insert("data.join".into(), op_data_join as Operator);
171        ops.insert("do".into(), op_do as Operator);
172        ops.insert("pipe".into(), op_pipe as Operator);
173        ops.insert("def".into(), op_def as Operator);
174        ops.insert("call".into(), op_call as Operator);
175        ops.insert("map".into(), op_map as Operator);
176        ops.insert("filter".into(), op_filter as Operator);
177        ops.insert("match".into(), op_match as Operator);
178        ops.insert("w.event.value".into(), op_event_value as Operator);
179        ops.insert("w.event.key".into(), op_event_key as Operator);
180        ops.insert("w.event.prevent".into(), op_event_prevent as Operator);
181        ops.insert("effect".into(), op_effect as Operator);
182        Self { ops }
183    }
184
185    /// Lookup an operator by name.
186    pub fn get(&self, name: &str) -> Option<Operator> {
187        self.ops.get(name).copied()
188    }
189
190    /// Register or replace an operator. Returns previous handler if present.
191    pub fn register(&mut self, name: impl Into<String>, operator: Operator) -> Option<Operator> {
192        self.ops.insert(name.into(), operator)
193    }
194}
195
196impl Default for Operators {
197    fn default() -> Self {
198        Self::new()
199    }
200}
201
202/// Recursively evaluate a JOSIE tree expression.
203///
204/// If the first array element is a known string operator, dispatches to
205/// that operator; otherwise evaluates array/object children as literals.
206pub fn evaluate(node: &Value, ctx: &mut Context) -> EvalResult {
207    match node {
208        Value::Array(arr) => {
209            if arr.is_empty() {
210                return Ok(Value::Array(vec![]));
211            }
212            if let Some(op_name) = arr.first().and_then(|v| v.as_str()) {
213                if let Some(op) = ctx.operators.get(op_name) {
214                    return op(&arr[1..], ctx);
215                }
216            }
217            let mut out = Vec::with_capacity(arr.len());
218            for item in arr {
219                out.push(evaluate(item, ctx)?);
220            }
221            Ok(Value::Array(out))
222        }
223        Value::Object(obj) => {
224            let mut out = Map::new();
225            for (k, v) in obj {
226                out.insert(k.clone(), evaluate(v, ctx)?);
227            }
228            Ok(Value::Object(out))
229        }
230        _ => Ok(node.clone()),
231    }
232}
233
234/// Read a value using dot-path semantics from a state map.
235pub fn get_path(state: &Map<String, Value>, path: &Value) -> Option<Value> {
236    let path = path.as_str()?;
237    if let Some(rest) = path.strip_prefix("client.") {
238        return state
239            .get("client")
240            .and_then(Value::as_object)
241            .and_then(|m| map_get(m, rest));
242    }
243    if let Some(rest) = path.strip_prefix("server.") {
244        return state
245            .get("server")
246            .and_then(Value::as_object)
247            .and_then(|m| map_get(m, rest));
248    }
249    map_get(state, path)
250}
251
252/// Write a value using dot-path semantics into a state map.
253pub fn set_path(state: &mut Map<String, Value>, path: &Value, value: Value) {
254    let Some(path) = path.as_str() else {
255        return;
256    };
257    if let Some(rest) = path.strip_prefix("client.") {
258        let client = state
259            .entry("client".to_string())
260            .or_insert_with(|| Value::Object(Map::new()));
261        if let Some(client_map) = client.as_object_mut() {
262            map_set(client_map, rest, value);
263        }
264        return;
265    }
266    if let Some(rest) = path.strip_prefix("server.") {
267        let server = state
268            .entry("server".to_string())
269            .or_insert_with(|| Value::Object(Map::new()));
270        if let Some(server_map) = server.as_object_mut() {
271            map_set(server_map, rest, value);
272        }
273        return;
274    }
275    map_set(state, path, value);
276}
277
278fn require_arg<'a>(args: &'a [Value], index: usize, op: &str) -> Result<&'a Value, EvalError> {
279    args.get(index)
280        .ok_or_else(|| EvalError::new(format!("{op} requires argument {index}")))
281}
282
283fn as_f64(value: &Value) -> Option<f64> {
284    match value {
285        Value::Number(n) => n.as_f64(),
286        Value::String(s) => s.parse::<f64>().ok(),
287        Value::Bool(b) => Some(if *b { 1.0 } else { 0.0 }),
288        _ => None,
289    }
290}
291
292fn as_i64(value: &Value) -> Option<i64> {
293    match value {
294        Value::Number(n) => n.as_i64().or_else(|| n.as_f64().map(|v| v as i64)),
295        Value::String(s) => s.parse::<i64>().ok(),
296        Value::Bool(b) => Some(if *b { 1 } else { 0 }),
297        _ => None,
298    }
299}
300
301fn truthy(v: &Value) -> bool {
302    match v {
303        Value::Null => false,
304        Value::Bool(b) => *b,
305        Value::Number(n) => n.as_f64().map(|f| f != 0.0).unwrap_or(false),
306        Value::String(s) => !s.is_empty(),
307        Value::Array(a) => !a.is_empty(),
308        Value::Object(o) => !o.is_empty(),
309    }
310}
311
312fn read_state_path(state: &State, path: &str) -> Option<Value> {
313    if !path.contains('.') {
314        for frame in state.call_locals.iter().rev() {
315            if let Some(v) = frame.get(path) {
316                return Some(v.clone());
317            }
318        }
319    }
320    if let Some(rest) = path.strip_prefix("client.") {
321        return map_get(&state.client, rest);
322    }
323    if let Some(rest) = path.strip_prefix("server.") {
324        return map_get(&state.server, rest);
325    }
326    map_get(&state.client, path).or_else(|| map_get(&state.server, path))
327}
328
329fn write_state_path(state: &mut State, path: &str, value: Value) {
330    if !path.contains('.') {
331        if let Some(frame) = state.call_locals.last_mut() {
332            if frame.contains_key(path) {
333                frame.insert(path.to_string(), value);
334                return;
335            }
336        }
337    }
338    if let Some(rest) = path.strip_prefix("client.") {
339        map_set(&mut state.client, rest, value);
340        return;
341    }
342    if let Some(rest) = path.strip_prefix("server.") {
343        map_set(&mut state.server, rest, value);
344        return;
345    }
346    map_set(&mut state.client, path, value);
347}
348
349fn map_get(map: &Map<String, Value>, path: &str) -> Option<Value> {
350    if path.is_empty() {
351        return Some(Value::Object(map.clone()));
352    }
353    let mut parts = path.split('.');
354    let first = parts.next()?;
355    let mut current = map.get(first)?;
356    for part in parts {
357        match current {
358            Value::Object(obj) => current = obj.get(part)?,
359            Value::Array(arr) => {
360                let idx = part.parse::<usize>().ok()?;
361                current = arr.get(idx)?;
362            }
363            _ => return None,
364        }
365    }
366    Some(current.clone())
367}
368
369fn map_set(map: &mut Map<String, Value>, path: &str, value: Value) {
370    let parts: Vec<&str> = path.split('.').filter(|p| !p.is_empty()).collect();
371    if parts.is_empty() {
372        return;
373    }
374    let mut cur = map;
375    for part in &parts[..parts.len() - 1] {
376        let entry = cur
377            .entry((*part).to_string())
378            .or_insert_with(|| Value::Object(Map::new()));
379        if !entry.is_object() {
380            *entry = Value::Object(Map::new());
381        }
382        cur = entry
383            .as_object_mut()
384            .expect("object ensured before descending");
385    }
386    cur.insert(parts[parts.len() - 1].to_string(), value);
387}
388
389#[derive(Debug, Clone)]
390enum PathSegment {
391    Key(String),
392    Index(usize),
393}
394
395fn parse_path(path: &str) -> Option<Vec<PathSegment>> {
396    if path.is_empty() {
397        return Some(Vec::new());
398    }
399
400    let chars: Vec<char> = path.chars().collect();
401    let mut i = 0usize;
402    let mut key = String::new();
403    let mut out = Vec::new();
404
405    while i < chars.len() {
406        match chars[i] {
407            '.' => {
408                if !key.is_empty() {
409                    out.push(PathSegment::Key(std::mem::take(&mut key)));
410                }
411                i += 1;
412            }
413            '[' => {
414                if !key.is_empty() {
415                    out.push(PathSegment::Key(std::mem::take(&mut key)));
416                }
417                i += 1;
418                let start = i;
419                while i < chars.len() && chars[i] != ']' {
420                    i += 1;
421                }
422                if i >= chars.len() || i == start {
423                    return None;
424                }
425                let idx_text: String = chars[start..i].iter().collect();
426                let idx = idx_text.parse::<usize>().ok()?;
427                out.push(PathSegment::Index(idx));
428                i += 1;
429            }
430            c => {
431                key.push(c);
432                i += 1;
433            }
434        }
435    }
436
437    if !key.is_empty() {
438        out.push(PathSegment::Key(key));
439    }
440    Some(out)
441}
442
443fn value_get_path(value: &Value, path: &str) -> Option<Value> {
444    let segs = parse_path(path)?;
445    if segs.is_empty() {
446        return Some(value.clone());
447    }
448    value_get_by_segments(value, &segs)
449}
450
451fn value_get_by_segments(value: &Value, segs: &[PathSegment]) -> Option<Value> {
452    let mut cur = value;
453    for seg in segs {
454        match seg {
455            PathSegment::Key(k) => cur = cur.as_object()?.get(k)?,
456            PathSegment::Index(i) => cur = cur.as_array()?.get(*i)?,
457        }
458    }
459    Some(cur.clone())
460}
461
462fn value_set_by_segments(target: &mut Value, segs: &[PathSegment], value: Value) {
463    fn set_at(cur: &mut Value, segs: &[PathSegment], idx: usize, value: Value) {
464        if idx >= segs.len() {
465            *cur = value;
466            return;
467        }
468        match &segs[idx] {
469            PathSegment::Key(k) => {
470                if !cur.is_object() {
471                    *cur = Value::Object(Map::new());
472                }
473                let obj = cur.as_object_mut().expect("object ensured");
474                let entry = obj.entry(k.clone()).or_insert(Value::Null);
475                set_at(entry, segs, idx + 1, value);
476            }
477            PathSegment::Index(i) => {
478                if !cur.is_array() {
479                    *cur = Value::Array(Vec::new());
480                }
481                let arr = cur.as_array_mut().expect("array ensured");
482                if arr.len() <= *i {
483                    arr.resize(*i + 1, Value::Null);
484                }
485                set_at(&mut arr[*i], segs, idx + 1, value);
486            }
487        }
488    }
489
490    set_at(target, segs, 0, value);
491}
492
493fn value_take_by_segments(target: &mut Value, segs: &[PathSegment]) -> Option<Value> {
494    if segs.is_empty() {
495        return None;
496    }
497
498    fn take_at(cur: &mut Value, segs: &[PathSegment], idx: usize) -> Option<Value> {
499        if idx + 1 == segs.len() {
500            return match &segs[idx] {
501                PathSegment::Key(k) => cur.as_object_mut()?.remove(k),
502                PathSegment::Index(i) => {
503                    let arr = cur.as_array_mut()?;
504                    if *i >= arr.len() {
505                        None
506                    } else {
507                        Some(std::mem::replace(&mut arr[*i], Value::Null))
508                    }
509                }
510            };
511        }
512
513        match &segs[idx] {
514            PathSegment::Key(k) => take_at(cur.as_object_mut()?.get_mut(k)?, segs, idx + 1),
515            PathSegment::Index(i) => take_at(cur.as_array_mut()?.get_mut(*i)?, segs, idx + 1),
516        }
517    }
518
519    take_at(target, segs, 0)
520}
521
522fn value_to_string(v: Value) -> String {
523    match v {
524        Value::String(s) => s,
525        Value::Null => String::new(),
526        other => other.to_string(),
527    }
528}
529
530fn value_to_bool(v: Value) -> bool {
531    match v {
532        Value::Bool(b) => b,
533        Value::Number(n) => n.as_f64().map(|x| x != 0.0).unwrap_or(false),
534        Value::String(s) => {
535            let norm = s.trim().to_ascii_lowercase();
536            matches!(norm.as_str(), "1" | "true" | "yes" | "y" | "on")
537        }
538        Value::Array(a) => !a.is_empty(),
539        Value::Object(o) => !o.is_empty(),
540        Value::Null => false,
541    }
542}
543
544fn cast_value(value: Value, ty: &str) -> Value {
545    match ty.trim().to_ascii_lowercase().as_str() {
546        "string" | "str" | "text" => Value::String(value_to_string(value)),
547        "int" | "i64" | "u64" | "integer" => json!(as_i64(&value).unwrap_or(0)),
548        "float" | "f64" | "number" => json!(as_f64(&value).unwrap_or(0.0)),
549        "bool" | "boolean" => Value::Bool(value_to_bool(value)),
550        "datetime" | "timestamp" => Value::String(value_to_string(value)),
551        _ => value,
552    }
553}
554
555#[derive(Debug, Clone, Default)]
556struct RegexOpts {
557    case_insensitive: bool,
558    multi_line: bool,
559    dot_matches_new_line: bool,
560    unicode: bool,
561    all: bool,
562}
563
564fn parse_regex_opts(raw: Option<&Value>) -> RegexOpts {
565    let mut out = RegexOpts {
566        unicode: true,
567        all: true,
568        ..RegexOpts::default()
569    };
570
571    let Some(raw) = raw else {
572        return out;
573    };
574
575    match raw {
576        Value::Bool(b) => out.all = *b,
577        Value::String(flags) => {
578            for ch in flags.chars() {
579                match ch {
580                    'i' | 'I' => out.case_insensitive = true,
581                    'm' | 'M' => out.multi_line = true,
582                    's' | 'S' => out.dot_matches_new_line = true,
583                    'u' | 'U' => out.unicode = true,
584                    'g' | 'G' => out.all = true,
585                    _ => {}
586                }
587            }
588        }
589        Value::Object(map) => {
590            out.case_insensitive = map
591                .get("case_insensitive")
592                .or_else(|| map.get("ignore_case"))
593                .map(|v| value_to_bool(v.clone()))
594                .unwrap_or(false);
595            out.multi_line = map
596                .get("multi_line")
597                .or_else(|| map.get("multiline"))
598                .map(|v| value_to_bool(v.clone()))
599                .unwrap_or(false);
600            out.dot_matches_new_line = map
601                .get("dot_matches_new_line")
602                .or_else(|| map.get("dot_all"))
603                .map(|v| value_to_bool(v.clone()))
604                .unwrap_or(false);
605            out.unicode = map
606                .get("unicode")
607                .map(|v| value_to_bool(v.clone()))
608                .unwrap_or(true);
609            out.all = map
610                .get("all")
611                .map(|v| value_to_bool(v.clone()))
612                .unwrap_or(true);
613        }
614        _ => {}
615    }
616
617    out
618}
619
620fn build_regex(pattern: &str, opts: &RegexOpts) -> Result<regex::Regex, EvalError> {
621    let mut builder = RegexBuilder::new(pattern);
622    builder
623        .case_insensitive(opts.case_insensitive)
624        .multi_line(opts.multi_line)
625        .dot_matches_new_line(opts.dot_matches_new_line)
626        .unicode(opts.unicode);
627    builder
628        .build()
629        .map_err(|e| EvalError::new(format!("invalid regex '{pattern}': {e}")))
630}
631
632fn cmp_numbers_or_strings(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
633    match (as_f64(a), as_f64(b)) {
634        (Some(x), Some(y)) => x.partial_cmp(&y),
635        _ => Some(a.to_string().cmp(&b.to_string())),
636    }
637}
638
639#[derive(Debug, Clone)]
640enum AggExpr {
641    CountAll,
642    CountField(Vec<PathSegment>),
643    Sum(Vec<PathSegment>),
644    Avg(Vec<PathSegment>),
645    Min(Vec<PathSegment>),
646    Max(Vec<PathSegment>),
647}
648
649fn parse_agg_expr(raw: &str) -> Option<AggExpr> {
650    let raw = raw.trim();
651    let text = raw.to_ascii_lowercase();
652    if text == "count(*)" {
653        return Some(AggExpr::CountAll);
654    }
655    let parse_fn = |name: &str| -> Option<Vec<PathSegment>> {
656        let prefix = format!("{name}(");
657        if !text.starts_with(&prefix) || !text.ends_with(')') {
658            return None;
659        }
660        let inner = raw[prefix.len()..raw.len() - 1].trim();
661        parse_path(inner)
662    };
663
664    parse_fn("count")
665        .map(AggExpr::CountField)
666        .or_else(|| parse_fn("sum").map(AggExpr::Sum))
667        .or_else(|| parse_fn("avg").map(AggExpr::Avg))
668        .or_else(|| parse_fn("min").map(AggExpr::Min))
669        .or_else(|| parse_fn("max").map(AggExpr::Max))
670}
671
672fn op_var(args: &[Value], ctx: &mut Context) -> EvalResult {
673    let raw = evaluate(require_arg(args, 0, "var")?, ctx)?;
674    let Some(path) = raw.as_str() else {
675        return Ok(Value::Null);
676    };
677    Ok(read_state_path(ctx.state, path).unwrap_or(Value::Null))
678}
679
680fn op_set(args: &[Value], ctx: &mut Context) -> EvalResult {
681    let path_val = evaluate(require_arg(args, 0, "set")?, ctx)?;
682    let value = evaluate(require_arg(args, 1, "set")?, ctx)?;
683    let Some(path) = path_val.as_str() else {
684        return Err(EvalError::new("set path must evaluate to string"));
685    };
686    write_state_path(ctx.state, path, value.clone());
687    Ok(value)
688}
689
690fn op_add(args: &[Value], ctx: &mut Context) -> EvalResult {
691    let mut total = 0.0;
692    for arg in args {
693        let v = evaluate(arg, ctx)?;
694        total += as_f64(&v).unwrap_or(0.0);
695    }
696    Ok(json!(total))
697}
698
699fn op_sub(args: &[Value], ctx: &mut Context) -> EvalResult {
700    let first = evaluate(require_arg(args, 0, "-")?, ctx)?;
701    let second = evaluate(require_arg(args, 1, "-")?, ctx)?;
702    Ok(json!(
703        as_f64(&first).unwrap_or(0.0) - as_f64(&second).unwrap_or(0.0)
704    ))
705}
706
707fn op_mul(args: &[Value], ctx: &mut Context) -> EvalResult {
708    let mut product = 1.0;
709    for arg in args {
710        let v = evaluate(arg, ctx)?;
711        product *= as_f64(&v).unwrap_or(0.0);
712    }
713    Ok(json!(product))
714}
715
716fn op_div(args: &[Value], ctx: &mut Context) -> EvalResult {
717    let a = evaluate(require_arg(args, 0, "/")?, ctx)?;
718    let b = evaluate(require_arg(args, 1, "/")?, ctx)?;
719    let denom = as_f64(&b).unwrap_or(0.0);
720    if denom == 0.0 {
721        return Err(EvalError::new("div by zero"));
722    }
723    Ok(json!(as_f64(&a).unwrap_or(0.0) / denom))
724}
725
726fn op_mod(args: &[Value], ctx: &mut Context) -> EvalResult {
727    let a = evaluate(require_arg(args, 0, "%")?, ctx)?;
728    let b = evaluate(require_arg(args, 1, "%")?, ctx)?;
729    let denom = as_i64(&b).unwrap_or(0);
730    if denom == 0 {
731        return Err(EvalError::new("mod by zero"));
732    }
733    Ok(json!(as_i64(&a).unwrap_or(0) % denom))
734}
735
736fn op_eq(args: &[Value], ctx: &mut Context) -> EvalResult {
737    let a = evaluate(require_arg(args, 0, "==")?, ctx)?;
738    let b = evaluate(require_arg(args, 1, "==")?, ctx)?;
739    Ok(Value::Bool(a == b))
740}
741
742fn op_neq(args: &[Value], ctx: &mut Context) -> EvalResult {
743    let a = evaluate(require_arg(args, 0, "!=")?, ctx)?;
744    let b = evaluate(require_arg(args, 1, "!=")?, ctx)?;
745    Ok(Value::Bool(a != b))
746}
747
748fn op_gt(args: &[Value], ctx: &mut Context) -> EvalResult {
749    let a = evaluate(require_arg(args, 0, ">")?, ctx)?;
750    let b = evaluate(require_arg(args, 1, ">")?, ctx)?;
751    Ok(Value::Bool(
752        cmp_numbers_or_strings(&a, &b).is_some_and(|o| o.is_gt()),
753    ))
754}
755
756fn op_lt(args: &[Value], ctx: &mut Context) -> EvalResult {
757    let a = evaluate(require_arg(args, 0, "<")?, ctx)?;
758    let b = evaluate(require_arg(args, 1, "<")?, ctx)?;
759    Ok(Value::Bool(
760        cmp_numbers_or_strings(&a, &b).is_some_and(|o| o.is_lt()),
761    ))
762}
763
764fn op_gte(args: &[Value], ctx: &mut Context) -> EvalResult {
765    let a = evaluate(require_arg(args, 0, ">=")?, ctx)?;
766    let b = evaluate(require_arg(args, 1, ">=")?, ctx)?;
767    Ok(Value::Bool(
768        cmp_numbers_or_strings(&a, &b).is_some_and(|o| !o.is_lt()),
769    ))
770}
771
772fn op_lte(args: &[Value], ctx: &mut Context) -> EvalResult {
773    let a = evaluate(require_arg(args, 0, "<=")?, ctx)?;
774    let b = evaluate(require_arg(args, 1, "<=")?, ctx)?;
775    Ok(Value::Bool(
776        cmp_numbers_or_strings(&a, &b).is_some_and(|o| !o.is_gt()),
777    ))
778}
779
780fn op_if(args: &[Value], ctx: &mut Context) -> EvalResult {
781    let cond = evaluate(require_arg(args, 0, "if")?, ctx)?;
782    if truthy(&cond) {
783        if let Some(t) = args.get(1) {
784            evaluate(t, ctx)
785        } else {
786            Ok(Value::Null)
787        }
788    } else if let Some(f) = args.get(2) {
789        evaluate(f, ctx)
790    } else {
791        Ok(Value::Null)
792    }
793}
794
795fn op_and(args: &[Value], ctx: &mut Context) -> EvalResult {
796    let mut last = Value::Bool(true);
797    for arg in args {
798        last = evaluate(arg, ctx)?;
799        if !truthy(&last) {
800            return Ok(Value::Bool(false));
801        }
802    }
803    Ok(last)
804}
805
806fn op_or(args: &[Value], ctx: &mut Context) -> EvalResult {
807    for arg in args {
808        let v = evaluate(arg, ctx)?;
809        if truthy(&v) {
810            return Ok(v);
811        }
812    }
813    Ok(Value::Bool(false))
814}
815
816fn op_not(args: &[Value], ctx: &mut Context) -> EvalResult {
817    let v = evaluate(require_arg(args, 0, "!")?, ctx)?;
818    Ok(Value::Bool(!truthy(&v)))
819}
820
821fn op_concat(args: &[Value], ctx: &mut Context) -> EvalResult {
822    let mut out = String::new();
823    for arg in args {
824        let v = evaluate(arg, ctx)?;
825        match v {
826            Value::String(s) => out.push_str(&s),
827            Value::Null => {}
828            other => out.push_str(&other.to_string()),
829        }
830    }
831    Ok(Value::String(out))
832}
833
834fn op_lower(args: &[Value], ctx: &mut Context) -> EvalResult {
835    let v = evaluate(require_arg(args, 0, "util.lower")?, ctx)?;
836    Ok(Value::String(
837        v.as_str().unwrap_or(&v.to_string()).to_lowercase(),
838    ))
839}
840
841fn op_upper(args: &[Value], ctx: &mut Context) -> EvalResult {
842    let v = evaluate(require_arg(args, 0, "util.upper")?, ctx)?;
843    Ok(Value::String(
844        v.as_str().unwrap_or(&v.to_string()).to_uppercase(),
845    ))
846}
847
848fn op_contains(args: &[Value], ctx: &mut Context) -> EvalResult {
849    let hay = evaluate(require_arg(args, 0, "util.contains")?, ctx)?;
850    let needle = evaluate(require_arg(args, 1, "util.contains")?, ctx)?;
851    let result = match hay {
852        Value::String(s) => s.contains(needle.as_str().unwrap_or(&needle.to_string())),
853        Value::Array(arr) => arr.contains(&needle),
854        _ => false,
855    };
856    Ok(Value::Bool(result))
857}
858
859fn op_template(args: &[Value], ctx: &mut Context) -> EvalResult {
860    let fmt = evaluate(require_arg(args, 0, "util.template")?, ctx)?;
861    let mut out = fmt.as_str().unwrap_or(&fmt.to_string()).to_string();
862    for arg in &args[1..] {
863        let v = evaluate(arg, ctx)?;
864        let rep = match v {
865            Value::String(s) => s,
866            Value::Null => String::new(),
867            other => other.to_string(),
868        };
869        if out.contains("{}") {
870            out = out.replacen("{}", &rep, 1);
871        }
872    }
873    Ok(Value::String(out))
874}
875
876fn op_to_int(args: &[Value], ctx: &mut Context) -> EvalResult {
877    let v = evaluate(require_arg(args, 0, "util.to_int")?, ctx)?;
878    Ok(json!(as_i64(&v).unwrap_or(0)))
879}
880
881fn op_to_float(args: &[Value], ctx: &mut Context) -> EvalResult {
882    let v = evaluate(require_arg(args, 0, "util.to_float")?, ctx)?;
883    Ok(json!(as_f64(&v).unwrap_or(0.0)))
884}
885
886fn op_to_string(args: &[Value], ctx: &mut Context) -> EvalResult {
887    let v = evaluate(require_arg(args, 0, "util.to_string")?, ctx)?;
888    Ok(Value::String(match v {
889        Value::String(s) => s,
890        Value::Null => String::new(),
891        other => other.to_string(),
892    }))
893}
894
895fn op_trim(args: &[Value], ctx: &mut Context) -> EvalResult {
896    let v = evaluate(require_arg(args, 0, "util.trim")?, ctx)?;
897    Ok(Value::String(
898        v.as_str().unwrap_or(&v.to_string()).trim().to_string(),
899    ))
900}
901
902fn op_str_len(args: &[Value], ctx: &mut Context) -> EvalResult {
903    let v = evaluate(require_arg(args, 0, "util.str_len")?, ctx)?;
904    Ok(json!(v.as_str().unwrap_or(&v.to_string()).chars().count()))
905}
906
907fn op_util_get_path(args: &[Value], ctx: &mut Context) -> EvalResult {
908    let source = evaluate(require_arg(args, 0, "util.get_path")?, ctx)?;
909    let path = evaluate(require_arg(args, 1, "util.get_path")?, ctx)?;
910    let fallback = if let Some(arg) = args.get(2) {
911        evaluate(arg, ctx)?
912    } else {
913        Value::Null
914    };
915    let Some(path) = path.as_str() else {
916        return Ok(fallback);
917    };
918    Ok(value_get_path(&source, path).unwrap_or(fallback))
919}
920
921fn op_util_one_of(args: &[Value], ctx: &mut Context) -> EvalResult {
922    let source = evaluate(require_arg(args, 0, "util.one_of")?, ctx)?;
923    let paths = evaluate(require_arg(args, 1, "util.one_of")?, ctx)?;
924    let fallback = if let Some(arg) = args.get(2) {
925        evaluate(arg, ctx)?
926    } else {
927        Value::Null
928    };
929
930    if let Some(list) = paths.as_array() {
931        for item in list {
932            if let Some(path) = item.as_str() {
933                if let Some(v) = value_get_path(&source, path) {
934                    return Ok(v);
935                }
936            }
937        }
938    } else if let Some(path) = paths.as_str() {
939        if let Some(v) = value_get_path(&source, path) {
940            return Ok(v);
941        }
942    }
943
944    Ok(fallback)
945}
946
947fn op_util_pick(args: &[Value], ctx: &mut Context) -> EvalResult {
948    let source = evaluate(require_arg(args, 0, "util.pick")?, ctx)?;
949    let paths = evaluate(require_arg(args, 1, "util.pick")?, ctx)?;
950    let mut out = Value::Object(Map::new());
951
952    if let Some(list) = paths.as_array() {
953        for item in list {
954            let Some(path) = item.as_str() else {
955                continue;
956            };
957            let Some(segs) = parse_path(path) else {
958                continue;
959            };
960            if let Some(v) = value_get_by_segments(&source, &segs) {
961                value_set_by_segments(&mut out, &segs, v);
962            }
963        }
964    }
965
966    Ok(out)
967}
968
969fn op_util_exists(args: &[Value], ctx: &mut Context) -> EvalResult {
970    let source = evaluate(require_arg(args, 0, "util.exists")?, ctx)?;
971    let path = evaluate(require_arg(args, 1, "util.exists")?, ctx)?;
972    let Some(path) = path.as_str() else {
973        return Ok(Value::Bool(false));
974    };
975    Ok(Value::Bool(value_get_path(&source, path).is_some()))
976}
977
978fn op_util_omit(args: &[Value], ctx: &mut Context) -> EvalResult {
979    let source = evaluate(require_arg(args, 0, "util.omit")?, ctx)?;
980    let paths = evaluate(require_arg(args, 1, "util.omit")?, ctx)?;
981    let mut out = source;
982
983    if let Some(list) = paths.as_array() {
984        for item in list {
985            let Some(path) = item.as_str() else {
986                continue;
987            };
988            let Some(segs) = parse_path(path) else {
989                continue;
990            };
991            let _ = value_take_by_segments(&mut out, &segs);
992        }
993    }
994
995    Ok(out)
996}
997
998fn op_util_as_array(args: &[Value], ctx: &mut Context) -> EvalResult {
999    let value = evaluate(require_arg(args, 0, "util.as_array")?, ctx)?;
1000    match value {
1001        Value::Array(arr) => Ok(Value::Array(arr)),
1002        Value::Null => Ok(Value::Array(Vec::new())),
1003        other => Ok(Value::Array(vec![other])),
1004    }
1005}
1006
1007fn op_util_to_bool(args: &[Value], ctx: &mut Context) -> EvalResult {
1008    let value = evaluate(require_arg(args, 0, "util.to_bool")?, ctx)?;
1009    Ok(Value::Bool(value_to_bool(value)))
1010}
1011
1012fn op_util_replace(args: &[Value], ctx: &mut Context) -> EvalResult {
1013    let source = value_to_string(evaluate(require_arg(args, 0, "util.replace")?, ctx)?);
1014    let from = value_to_string(evaluate(require_arg(args, 1, "util.replace")?, ctx)?);
1015    let to = value_to_string(evaluate(require_arg(args, 2, "util.replace")?, ctx)?);
1016    if from.is_empty() {
1017        return Ok(Value::String(source));
1018    }
1019    Ok(Value::String(source.replacen(&from, &to, 1)))
1020}
1021
1022fn op_util_replace_all(args: &[Value], ctx: &mut Context) -> EvalResult {
1023    let source = value_to_string(evaluate(require_arg(args, 0, "util.replace_all")?, ctx)?);
1024    let from = value_to_string(evaluate(require_arg(args, 1, "util.replace_all")?, ctx)?);
1025    let to = value_to_string(evaluate(require_arg(args, 2, "util.replace_all")?, ctx)?);
1026    if from.is_empty() {
1027        return Ok(Value::String(source));
1028    }
1029    Ok(Value::String(source.replace(&from, &to)))
1030}
1031
1032fn op_util_split(args: &[Value], ctx: &mut Context) -> EvalResult {
1033    let source = value_to_string(evaluate(require_arg(args, 0, "util.split")?, ctx)?);
1034    let delim = value_to_string(evaluate(require_arg(args, 1, "util.split")?, ctx)?);
1035    if delim.is_empty() {
1036        let parts: Vec<Value> = source
1037            .chars()
1038            .map(|c| Value::String(c.to_string()))
1039            .collect();
1040        return Ok(Value::Array(parts));
1041    }
1042    Ok(Value::Array(
1043        source
1044            .split(&delim)
1045            .map(|s| Value::String(s.to_string()))
1046            .collect(),
1047    ))
1048}
1049
1050fn op_util_join(args: &[Value], ctx: &mut Context) -> EvalResult {
1051    let values = evaluate(require_arg(args, 0, "util.join")?, ctx)?;
1052    let delim = value_to_string(evaluate(require_arg(args, 1, "util.join")?, ctx)?);
1053    let Some(arr) = values.as_array() else {
1054        return Ok(Value::String(value_to_string(values)));
1055    };
1056    let out = arr
1057        .iter()
1058        .cloned()
1059        .map(value_to_string)
1060        .collect::<Vec<_>>()
1061        .join(&delim);
1062    Ok(Value::String(out))
1063}
1064
1065fn op_util_regex_match(args: &[Value], ctx: &mut Context) -> EvalResult {
1066    let source = value_to_string(evaluate(require_arg(args, 0, "util.regex_match")?, ctx)?);
1067    let pattern = value_to_string(evaluate(require_arg(args, 1, "util.regex_match")?, ctx)?);
1068    let opts_val = if let Some(v) = args.get(2) {
1069        Some(evaluate(v, ctx)?)
1070    } else {
1071        None
1072    };
1073    let opts = parse_regex_opts(opts_val.as_ref());
1074    let re = build_regex(&pattern, &opts)?;
1075    Ok(Value::Bool(re.is_match(&source)))
1076}
1077
1078fn op_util_regex_find(args: &[Value], ctx: &mut Context) -> EvalResult {
1079    let source = value_to_string(evaluate(require_arg(args, 0, "util.regex_find")?, ctx)?);
1080    let pattern = value_to_string(evaluate(require_arg(args, 1, "util.regex_find")?, ctx)?);
1081    let opts_val = if let Some(v) = args.get(2) {
1082        Some(evaluate(v, ctx)?)
1083    } else {
1084        None
1085    };
1086    let opts = parse_regex_opts(opts_val.as_ref());
1087    let re = build_regex(&pattern, &opts)?;
1088    Ok(re
1089        .find(&source)
1090        .map(|m| Value::String(m.as_str().to_string()))
1091        .unwrap_or(Value::Null))
1092}
1093
1094fn op_util_regex_find_all(args: &[Value], ctx: &mut Context) -> EvalResult {
1095    let source = value_to_string(evaluate(require_arg(args, 0, "util.regex_find_all")?, ctx)?);
1096    let pattern = value_to_string(evaluate(require_arg(args, 1, "util.regex_find_all")?, ctx)?);
1097    let opts_val = if let Some(v) = args.get(2) {
1098        Some(evaluate(v, ctx)?)
1099    } else {
1100        None
1101    };
1102    let opts = parse_regex_opts(opts_val.as_ref());
1103    let re = build_regex(&pattern, &opts)?;
1104    Ok(Value::Array(
1105        re.find_iter(&source)
1106            .map(|m| Value::String(m.as_str().to_string()))
1107            .collect(),
1108    ))
1109}
1110
1111fn op_util_regex_replace(args: &[Value], ctx: &mut Context) -> EvalResult {
1112    let source = value_to_string(evaluate(require_arg(args, 0, "util.regex_replace")?, ctx)?);
1113    let pattern = value_to_string(evaluate(require_arg(args, 1, "util.regex_replace")?, ctx)?);
1114    let replacement = value_to_string(evaluate(require_arg(args, 2, "util.regex_replace")?, ctx)?);
1115    let opts_val = if let Some(v) = args.get(3) {
1116        Some(evaluate(v, ctx)?)
1117    } else {
1118        None
1119    };
1120    let opts = parse_regex_opts(opts_val.as_ref());
1121    let re = build_regex(&pattern, &opts)?;
1122
1123    let out = if opts.all {
1124        re.replace_all(&source, replacement.as_str()).to_string()
1125    } else {
1126        re.replace(&source, replacement.as_str()).to_string()
1127    };
1128    Ok(Value::String(out))
1129}
1130
1131fn op_data_select(args: &[Value], ctx: &mut Context) -> EvalResult {
1132    let rows = evaluate(require_arg(args, 0, "data.select")?, ctx)?;
1133    let fields = evaluate(require_arg(args, 1, "data.select")?, ctx)?;
1134    let arr = rows
1135        .as_array()
1136        .ok_or_else(|| EvalError::new("data.select first arg must be array"))?;
1137    let fields = fields
1138        .as_array()
1139        .ok_or_else(|| EvalError::new("data.select second arg must be array"))?;
1140
1141    let mut parsed_fields: Vec<Vec<PathSegment>> = Vec::new();
1142    for field in fields {
1143        if let Some(path) = field.as_str() {
1144            if let Some(segs) = parse_path(path) {
1145                parsed_fields.push(segs);
1146            }
1147        }
1148    }
1149
1150    let mut out = Vec::with_capacity(arr.len());
1151    for row in arr {
1152        let mut next = Value::Object(Map::new());
1153        for segs in &parsed_fields {
1154            let val = value_get_by_segments(row, segs).unwrap_or(Value::Null);
1155            value_set_by_segments(&mut next, segs, val);
1156        }
1157        out.push(next);
1158    }
1159    Ok(Value::Array(out))
1160}
1161
1162fn op_data_rename(args: &[Value], ctx: &mut Context) -> EvalResult {
1163    let rows = evaluate(require_arg(args, 0, "data.rename")?, ctx)?;
1164    let mapping = evaluate(require_arg(args, 1, "data.rename")?, ctx)?;
1165    let arr = rows
1166        .as_array()
1167        .ok_or_else(|| EvalError::new("data.rename first arg must be array"))?;
1168    let mapping = mapping
1169        .as_object()
1170        .ok_or_else(|| EvalError::new("data.rename second arg must be object"))?;
1171
1172    let mut pairs = Vec::new();
1173    for (from, to) in mapping {
1174        let Some(to_path) = to.as_str() else {
1175            continue;
1176        };
1177        let (Some(from_segs), Some(to_segs)) = (parse_path(from), parse_path(to_path)) else {
1178            continue;
1179        };
1180        pairs.push((from_segs, to_segs));
1181    }
1182
1183    let mut out = Vec::with_capacity(arr.len());
1184    for row in arr {
1185        let mut next = row.clone();
1186        for (from, to) in &pairs {
1187            if let Some(v) = value_take_by_segments(&mut next, from) {
1188                value_set_by_segments(&mut next, to, v);
1189            }
1190        }
1191        out.push(next);
1192    }
1193
1194    Ok(Value::Array(out))
1195}
1196
1197fn op_data_cast(args: &[Value], ctx: &mut Context) -> EvalResult {
1198    let rows = evaluate(require_arg(args, 0, "data.cast")?, ctx)?;
1199    let mapping = evaluate(require_arg(args, 1, "data.cast")?, ctx)?;
1200    let arr = rows
1201        .as_array()
1202        .ok_or_else(|| EvalError::new("data.cast first arg must be array"))?;
1203    let mapping = mapping
1204        .as_object()
1205        .ok_or_else(|| EvalError::new("data.cast second arg must be object"))?;
1206
1207    let mut casts = Vec::new();
1208    for (field, ty) in mapping {
1209        let Some(ty_name) = ty.as_str() else {
1210            continue;
1211        };
1212        let Some(segs) = parse_path(field) else {
1213            continue;
1214        };
1215        casts.push((segs, ty_name.to_string()));
1216    }
1217
1218    let mut out = Vec::with_capacity(arr.len());
1219    for row in arr {
1220        let mut next = row.clone();
1221        for (segs, ty) in &casts {
1222            if let Some(v) = value_get_by_segments(&next, segs) {
1223                let casted = cast_value(v, ty);
1224                value_set_by_segments(&mut next, segs, casted);
1225            }
1226        }
1227        out.push(next);
1228    }
1229    Ok(Value::Array(out))
1230}
1231
1232fn op_data_chunk(args: &[Value], ctx: &mut Context) -> EvalResult {
1233    let rows = evaluate(require_arg(args, 0, "data.chunk")?, ctx)?;
1234    let size = evaluate(require_arg(args, 1, "data.chunk")?, ctx)?;
1235    let arr = rows
1236        .as_array()
1237        .ok_or_else(|| EvalError::new("data.chunk first arg must be array"))?;
1238    let n = as_i64(&size).unwrap_or(0);
1239    if n <= 0 {
1240        return Err(EvalError::new("data.chunk size must be > 0"));
1241    }
1242    let n = n as usize;
1243
1244    let mut out = Vec::new();
1245    for chunk in arr.chunks(n) {
1246        out.push(Value::Array(chunk.to_vec()));
1247    }
1248    Ok(Value::Array(out))
1249}
1250
1251fn op_data_flat_map(args: &[Value], ctx: &mut Context) -> EvalResult {
1252    let rows = evaluate(require_arg(args, 0, "data.flat_map")?, ctx)?;
1253    let path = evaluate(require_arg(args, 1, "data.flat_map")?, ctx)?;
1254    let arr = rows
1255        .as_array()
1256        .ok_or_else(|| EvalError::new("data.flat_map first arg must be array"))?;
1257    let Some(path) = path.as_str() else {
1258        return Err(EvalError::new(
1259            "data.flat_map second arg must be path string",
1260        ));
1261    };
1262    let segs = parse_path(path).ok_or_else(|| EvalError::new("data.flat_map path is invalid"))?;
1263
1264    let mut out = Vec::new();
1265    for row in arr {
1266        let Some(value) = value_get_by_segments(row, &segs) else {
1267            continue;
1268        };
1269        match value {
1270            Value::Array(items) => {
1271                for item in items {
1272                    let mut next = row.clone();
1273                    value_set_by_segments(&mut next, &segs, item);
1274                    out.push(next);
1275                }
1276            }
1277            Value::Null => {}
1278            other => {
1279                let mut next = row.clone();
1280                value_set_by_segments(&mut next, &segs, other);
1281                out.push(next);
1282            }
1283        }
1284    }
1285
1286    Ok(Value::Array(out))
1287}
1288
1289fn op_data_distinct(args: &[Value], ctx: &mut Context) -> EvalResult {
1290    let rows = evaluate(require_arg(args, 0, "data.distinct")?, ctx)?;
1291    let arr = rows
1292        .as_array()
1293        .ok_or_else(|| EvalError::new("data.distinct first arg must be array"))?;
1294
1295    let key_paths = if let Some(arg) = args.get(1) {
1296        let keys = evaluate(arg, ctx)?;
1297        keys.as_array().map(|list| {
1298            list.iter()
1299                .filter_map(|v| v.as_str())
1300                .filter_map(parse_path)
1301                .collect::<Vec<_>>()
1302        })
1303    } else {
1304        None
1305    };
1306
1307    let mut seen = HashSet::new();
1308    let mut out = Vec::new();
1309    for row in arr {
1310        let sig = if let Some(paths) = &key_paths {
1311            let key_vals: Vec<Value> = paths
1312                .iter()
1313                .map(|segs| value_get_by_segments(row, segs).unwrap_or(Value::Null))
1314                .collect();
1315            serde_json::to_string(&key_vals).unwrap_or_else(|_| "[]".to_string())
1316        } else {
1317            serde_json::to_string(row).unwrap_or_else(|_| "null".to_string())
1318        };
1319
1320        if seen.insert(sig) {
1321            out.push(row.clone());
1322        }
1323    }
1324
1325    Ok(Value::Array(out))
1326}
1327
1328fn op_data_sort_by(args: &[Value], ctx: &mut Context) -> EvalResult {
1329    let rows = evaluate(require_arg(args, 0, "data.sort_by")?, ctx)?;
1330    let spec = evaluate(require_arg(args, 1, "data.sort_by")?, ctx)?;
1331    let arr = rows
1332        .as_array()
1333        .ok_or_else(|| EvalError::new("data.sort_by first arg must be array"))?;
1334    let spec_arr = spec
1335        .as_array()
1336        .ok_or_else(|| EvalError::new("data.sort_by second arg must be array"))?;
1337
1338    let mut order_specs: Vec<(Vec<PathSegment>, bool)> = Vec::new();
1339    for s in spec_arr {
1340        match s {
1341            Value::String(path) => {
1342                if let Some(segs) = parse_path(path) {
1343                    order_specs.push((segs, true));
1344                }
1345            }
1346            Value::Object(obj) => {
1347                let Some(col) = obj.get("col").and_then(Value::as_str) else {
1348                    continue;
1349                };
1350                let asc = !obj
1351                    .get("dir")
1352                    .and_then(Value::as_str)
1353                    .map(|d| d.eq_ignore_ascii_case("desc"))
1354                    .unwrap_or(false);
1355                if let Some(segs) = parse_path(col) {
1356                    order_specs.push((segs, asc));
1357                }
1358            }
1359            _ => {}
1360        }
1361    }
1362
1363    let mut out = arr.clone();
1364    out.sort_by(|a, b| {
1365        for (segs, asc) in &order_specs {
1366            let av = value_get_by_segments(a, segs).unwrap_or(Value::Null);
1367            let bv = value_get_by_segments(b, segs).unwrap_or(Value::Null);
1368            let ord = cmp_numbers_or_strings(&av, &bv).unwrap_or(std::cmp::Ordering::Equal);
1369            if ord != std::cmp::Ordering::Equal {
1370                return if *asc { ord } else { ord.reverse() };
1371            }
1372        }
1373        std::cmp::Ordering::Equal
1374    });
1375
1376    Ok(Value::Array(out))
1377}
1378
1379fn op_data_group_by(args: &[Value], ctx: &mut Context) -> EvalResult {
1380    let rows = evaluate(require_arg(args, 0, "data.group_by")?, ctx)?;
1381    let keys = evaluate(require_arg(args, 1, "data.group_by")?, ctx)?;
1382    let arr = rows
1383        .as_array()
1384        .ok_or_else(|| EvalError::new("data.group_by first arg must be array"))?;
1385    let key_list = keys
1386        .as_array()
1387        .ok_or_else(|| EvalError::new("data.group_by second arg must be array"))?;
1388
1389    let mut parsed_keys: Vec<(String, Vec<PathSegment>)> = Vec::new();
1390    for k in key_list {
1391        let Some(path) = k.as_str() else {
1392            continue;
1393        };
1394        if let Some(segs) = parse_path(path) {
1395            parsed_keys.push((path.to_string(), segs));
1396        }
1397    }
1398
1399    let mut order: Vec<(Map<String, Value>, Vec<Value>)> = Vec::new();
1400    let mut index: HashMap<String, usize> = HashMap::new();
1401    for row in arr {
1402        let mut key_obj = Map::new();
1403        let mut key_vals = Vec::new();
1404        for (raw, segs) in &parsed_keys {
1405            let v = value_get_by_segments(row, segs).unwrap_or(Value::Null);
1406            key_vals.push(v.clone());
1407            key_obj.insert(raw.clone(), v);
1408        }
1409        let sig = serde_json::to_string(&key_vals).unwrap_or_else(|_| "[]".to_string());
1410        if let Some(i) = index.get(&sig).copied() {
1411            order[i].1.push(row.clone());
1412        } else {
1413            index.insert(sig, order.len());
1414            order.push((key_obj, vec![row.clone()]));
1415        }
1416    }
1417
1418    let mut out = Vec::with_capacity(order.len());
1419    for (keys_obj, grouped_rows) in order {
1420        let mut obj = keys_obj;
1421        obj.insert("_rows".to_string(), Value::Array(grouped_rows));
1422        out.push(Value::Object(obj));
1423    }
1424    Ok(Value::Array(out))
1425}
1426
1427fn op_data_aggregate(args: &[Value], ctx: &mut Context) -> EvalResult {
1428    let groups = evaluate(require_arg(args, 0, "data.aggregate")?, ctx)?;
1429    let spec = evaluate(require_arg(args, 1, "data.aggregate")?, ctx)?;
1430    let groups_arr = groups
1431        .as_array()
1432        .ok_or_else(|| EvalError::new("data.aggregate first arg must be array"))?;
1433    let spec_obj = spec
1434        .as_object()
1435        .ok_or_else(|| EvalError::new("data.aggregate second arg must be object"))?;
1436
1437    let mut parsed_spec: Vec<(String, AggExpr)> = Vec::new();
1438    for (out_key, expr_val) in spec_obj {
1439        let Some(expr) = expr_val.as_str() else {
1440            continue;
1441        };
1442        if let Some(parsed) = parse_agg_expr(expr) {
1443            parsed_spec.push((out_key.clone(), parsed));
1444        }
1445    }
1446
1447    let mut out = Vec::with_capacity(groups_arr.len());
1448    for group in groups_arr {
1449        let Some(group_obj) = group.as_object() else {
1450            continue;
1451        };
1452        let rows = group_obj
1453            .get("_rows")
1454            .and_then(Value::as_array)
1455            .cloned()
1456            .unwrap_or_default();
1457
1458        let mut next = Map::new();
1459        for (k, v) in group_obj {
1460            if k != "_rows" {
1461                next.insert(k.clone(), v.clone());
1462            }
1463        }
1464
1465        for (out_key, expr) in &parsed_spec {
1466            let value = match expr {
1467                AggExpr::CountAll => json!(rows.len()),
1468                AggExpr::CountField(path) => {
1469                    let n = rows
1470                        .iter()
1471                        .filter(|r| value_get_by_segments(r, path).is_some())
1472                        .count();
1473                    json!(n)
1474                }
1475                AggExpr::Sum(path) => {
1476                    let mut sum = 0.0f64;
1477                    for r in &rows {
1478                        if let Some(v) = value_get_by_segments(r, path) {
1479                            sum += as_f64(&v).unwrap_or(0.0);
1480                        }
1481                    }
1482                    json!(sum)
1483                }
1484                AggExpr::Avg(path) => {
1485                    let mut sum = 0.0f64;
1486                    let mut n = 0usize;
1487                    for r in &rows {
1488                        if let Some(v) = value_get_by_segments(r, path) {
1489                            sum += as_f64(&v).unwrap_or(0.0);
1490                            n += 1;
1491                        }
1492                    }
1493                    json!(if n == 0 { 0.0 } else { sum / n as f64 })
1494                }
1495                AggExpr::Min(path) => {
1496                    let mut best: Option<Value> = None;
1497                    for r in &rows {
1498                        if let Some(v) = value_get_by_segments(r, path) {
1499                            if let Some(cur) = &best {
1500                                if cmp_numbers_or_strings(&v, cur).is_some_and(|ord| ord.is_lt()) {
1501                                    best = Some(v);
1502                                }
1503                            } else {
1504                                best = Some(v);
1505                            }
1506                        }
1507                    }
1508                    best.unwrap_or(Value::Null)
1509                }
1510                AggExpr::Max(path) => {
1511                    let mut best: Option<Value> = None;
1512                    for r in &rows {
1513                        if let Some(v) = value_get_by_segments(r, path) {
1514                            if let Some(cur) = &best {
1515                                if cmp_numbers_or_strings(&v, cur).is_some_and(|ord| ord.is_gt()) {
1516                                    best = Some(v);
1517                                }
1518                            } else {
1519                                best = Some(v);
1520                            }
1521                        }
1522                    }
1523                    best.unwrap_or(Value::Null)
1524                }
1525            };
1526            next.insert(out_key.clone(), value);
1527        }
1528
1529        out.push(Value::Object(next));
1530    }
1531
1532    Ok(Value::Array(out))
1533}
1534
1535fn op_data_join(args: &[Value], ctx: &mut Context) -> EvalResult {
1536    let left = evaluate(require_arg(args, 0, "data.join")?, ctx)?;
1537    let right = evaluate(require_arg(args, 1, "data.join")?, ctx)?;
1538    let options = evaluate(require_arg(args, 2, "data.join")?, ctx)?;
1539    let left_arr = left
1540        .as_array()
1541        .ok_or_else(|| EvalError::new("data.join first arg must be array"))?;
1542    let right_arr = right
1543        .as_array()
1544        .ok_or_else(|| EvalError::new("data.join second arg must be array"))?;
1545    let options = options
1546        .as_object()
1547        .ok_or_else(|| EvalError::new("data.join third arg must be object"))?;
1548
1549    let on_fields = options
1550        .get("on")
1551        .and_then(Value::as_array)
1552        .ok_or_else(|| EvalError::new("data.join requires options.on array"))?;
1553    let join_type = options
1554        .get("type")
1555        .and_then(Value::as_str)
1556        .unwrap_or("inner")
1557        .to_ascii_lowercase();
1558    let is_left_join = join_type == "left";
1559
1560    let mut parsed_on = Vec::new();
1561    for f in on_fields {
1562        let Some(path) = f.as_str() else {
1563            continue;
1564        };
1565        if let Some(segs) = parse_path(path) {
1566            parsed_on.push((path.to_string(), segs));
1567        }
1568    }
1569
1570    let mut right_index: HashMap<String, Vec<Value>> = HashMap::new();
1571    for row in right_arr {
1572        let key_vals: Vec<Value> = parsed_on
1573            .iter()
1574            .map(|(_, segs)| value_get_by_segments(row, segs).unwrap_or(Value::Null))
1575            .collect();
1576        let sig = serde_json::to_string(&key_vals).unwrap_or_else(|_| "[]".to_string());
1577        right_index.entry(sig).or_default().push(row.clone());
1578    }
1579
1580    let mut out = Vec::new();
1581    for l in left_arr {
1582        let key_vals: Vec<Value> = parsed_on
1583            .iter()
1584            .map(|(_, segs)| value_get_by_segments(l, segs).unwrap_or(Value::Null))
1585            .collect();
1586        let sig = serde_json::to_string(&key_vals).unwrap_or_else(|_| "[]".to_string());
1587        let matched = right_index.get(&sig);
1588
1589        if let Some(rs) = matched {
1590            for r in rs {
1591                let mut merged = l.clone();
1592                if let (Some(lo), Some(ro)) = (merged.as_object_mut(), r.as_object()) {
1593                    for (k, v) in ro {
1594                        if lo.contains_key(k) {
1595                            lo.insert(format!("right_{k}"), v.clone());
1596                        } else {
1597                            lo.insert(k.clone(), v.clone());
1598                        }
1599                    }
1600                }
1601                out.push(merged);
1602            }
1603        } else if is_left_join {
1604            out.push(l.clone());
1605        }
1606    }
1607
1608    Ok(Value::Array(out))
1609}
1610
1611fn op_log(args: &[Value], ctx: &mut Context) -> EvalResult {
1612    let mut out = Vec::with_capacity(args.len());
1613    for arg in args {
1614        out.push(evaluate(arg, ctx)?);
1615    }
1616    eprintln!("[josie.log] {}", Value::Array(out.clone()));
1617    Ok(Value::Array(out))
1618}
1619
1620fn op_len(args: &[Value], ctx: &mut Context) -> EvalResult {
1621    let v = evaluate(require_arg(args, 0, "len")?, ctx)?;
1622    let n = match v {
1623        Value::String(s) => s.chars().count(),
1624        Value::Array(a) => a.len(),
1625        Value::Object(o) => o.len(),
1626        _ => 0,
1627    };
1628    Ok(json!(n))
1629}
1630
1631fn op_push(args: &[Value], ctx: &mut Context) -> EvalResult {
1632    let list = evaluate(require_arg(args, 0, "push")?, ctx)?;
1633    let item = evaluate(require_arg(args, 1, "push")?, ctx)?;
1634    let mut arr = match list {
1635        Value::Array(a) => a,
1636        _ => Vec::new(),
1637    };
1638    arr.push(item);
1639    Ok(Value::Array(arr))
1640}
1641
1642fn op_get(args: &[Value], ctx: &mut Context) -> EvalResult {
1643    let collection = evaluate(require_arg(args, 0, "get")?, ctx)?;
1644    let key = evaluate(require_arg(args, 1, "get")?, ctx)?;
1645    let out = match (collection, key) {
1646        (Value::Object(obj), Value::String(k)) => obj.get(&k).cloned(),
1647        (Value::Array(arr), Value::Number(n)) => {
1648            n.as_u64().and_then(|i| arr.get(i as usize).cloned())
1649        }
1650        (Value::Array(arr), Value::String(s)) => {
1651            s.parse::<usize>().ok().and_then(|i| arr.get(i).cloned())
1652        }
1653        _ => None,
1654    };
1655    Ok(out.unwrap_or(Value::Null))
1656}
1657
1658fn op_do(args: &[Value], ctx: &mut Context) -> EvalResult {
1659    let mut last = Value::Null;
1660    for arg in args {
1661        last = evaluate(arg, ctx)?;
1662    }
1663    Ok(last)
1664}
1665
1666fn op_pipe(args: &[Value], ctx: &mut Context) -> EvalResult {
1667    let prev_pipe = ctx.state.client.get("pipe").cloned();
1668    let out = (|| {
1669        let mut last = Value::Null;
1670        for step in args {
1671            let mut pipe_obj = Map::new();
1672            pipe_obj.insert("prev".to_string(), last.clone());
1673            ctx.state
1674                .client
1675                .insert("pipe".to_string(), Value::Object(pipe_obj));
1676            last = evaluate(step, ctx)?;
1677        }
1678        Ok(last)
1679    })();
1680
1681    if let Some(old) = prev_pipe {
1682        ctx.state.client.insert("pipe".to_string(), old);
1683    } else {
1684        ctx.state.client.remove("pipe");
1685    }
1686    out
1687}
1688
1689fn has_side_effects(node: &Value) -> bool {
1690    match node {
1691        Value::Array(arr) => {
1692            if let Some(op) = arr.first().and_then(Value::as_str) {
1693                match op {
1694                    "set" | "def" | "effect" | "log" | "push" | "w.event.prevent" => {
1695                        return true;
1696                    }
1697                    _ => {}
1698                }
1699            }
1700            arr.iter().any(has_side_effects)
1701        }
1702        Value::Object(obj) => obj.values().any(has_side_effects),
1703        _ => false,
1704    }
1705}
1706
1707fn memo_key_from_args(args: &[Value]) -> Option<String> {
1708    let cacheable = args.iter().all(|v| {
1709        matches!(
1710            v,
1711            Value::Null | Value::Bool(_) | Value::Number(_) | Value::String(_)
1712        )
1713    });
1714    if !cacheable {
1715        return None;
1716    }
1717    serde_json::to_string(args).ok()
1718}
1719
1720fn op_def(args: &[Value], ctx: &mut Context) -> EvalResult {
1721    let name = require_arg(args, 0, "def")?
1722        .as_str()
1723        .ok_or_else(|| EvalError::new("def name must be string"))?
1724        .to_string();
1725    let params = args
1726        .get(1)
1727        .and_then(Value::as_array)
1728        .map(|arr| {
1729            arr.iter()
1730                .filter_map(|v| v.as_str().map(ToString::to_string))
1731                .collect::<Vec<_>>()
1732        })
1733        .unwrap_or_default();
1734    let body = require_arg(args, 2, "def")?.clone();
1735    let memoizable = !has_side_effects(&body);
1736    ctx.state.fns.insert(
1737        name.clone(),
1738        FunctionDef {
1739            params,
1740            body,
1741            memoizable,
1742        },
1743    );
1744    // Def replacement invalidates previous memoized calls for this function.
1745    ctx.state.call_memo.remove(&name);
1746    Ok(Value::Null)
1747}
1748
1749/// Invokes a user-defined function (`def`) or a registered operator by name.
1750///
1751/// # Call protocol
1752/// ```text
1753/// (call <name> <arg0> <arg1> ...)
1754/// ```
1755///
1756/// # Performance contract
1757/// Parameters are bound into a lightweight call-local frame (`call_locals`)
1758/// rather than mutating/cloning the full state maps. Variable resolution checks
1759/// the newest local frame first, then falls back to client/server paths.
1760/// Cost per call frame: O(param_count) without touching large state maps.
1761///
1762/// This is critical for recursive functions (e.g. Fibonacci), eliminating both
1763/// full `State::clone()` and repeated client-map insert/remove churn per frame.
1764fn op_call(args: &[Value], ctx: &mut Context) -> EvalResult {
1765    let name = if let Some(s) = require_arg(args, 0, "call")?.as_str() {
1766        s.to_string()
1767    } else {
1768        let name_val = evaluate(require_arg(args, 0, "call")?, ctx)?;
1769        name_val
1770            .as_str()
1771            .ok_or_else(|| EvalError::new("call function name must be string"))?
1772            .to_string()
1773    };
1774
1775    if let Some(def) = ctx.state.fns.get(&name).cloned() {
1776        // Evaluate args before touching state.
1777        let mut evaluated_args = Vec::with_capacity(args.len().saturating_sub(1));
1778        for arg in &args[1..] {
1779            evaluated_args.push(evaluate(arg, ctx)?);
1780        }
1781
1782        let memo_key = if def.memoizable {
1783            memo_key_from_args(&evaluated_args)
1784        } else {
1785            None
1786        };
1787        if let Some(key) = memo_key.as_ref() {
1788            if let Some(v) = ctx
1789                .state
1790                .call_memo
1791                .get(&name)
1792                .and_then(|entry| entry.get(key))
1793                .cloned()
1794            {
1795                return Ok(v);
1796            }
1797        }
1798
1799        // Bind params into a dedicated call-local frame.
1800        let mut frame: HashMap<String, Value> = HashMap::with_capacity(def.params.len());
1801        for (idx, param) in def.params.iter().enumerate() {
1802            let new_val = evaluated_args.get(idx).cloned().unwrap_or(Value::Null);
1803            frame.insert(param.clone(), new_val);
1804        }
1805        ctx.state.call_locals.push(frame);
1806
1807        let result = evaluate(&def.body, ctx);
1808
1809        // Pop local frame and restore caller scope.
1810        let _ = ctx.state.call_locals.pop();
1811
1812        if let (Some(key), Ok(v)) = (memo_key, &result) {
1813            ctx.state
1814                .call_memo
1815                .entry(name.clone())
1816                .or_default()
1817                .insert(key, v.clone());
1818        }
1819
1820        return result;
1821    }
1822
1823    if let Some(op) = ctx.operators.get(&name) {
1824        return op(&args[1..], ctx);
1825    }
1826
1827    Err(EvalError::new(format!("function '{name}' not found")))
1828}
1829
1830fn with_item_scope<F>(ctx: &mut Context, item: Value, index: usize, f: F) -> EvalResult
1831where
1832    F: FnOnce(&mut Context) -> EvalResult,
1833{
1834    let old_item = ctx.state.client.insert("item".to_string(), item);
1835    let old_index = ctx.state.client.insert("index".to_string(), json!(index));
1836    let out = f(ctx);
1837    if let Some(prev) = old_item {
1838        ctx.state.client.insert("item".to_string(), prev);
1839    } else {
1840        ctx.state.client.remove("item");
1841    }
1842    if let Some(prev) = old_index {
1843        ctx.state.client.insert("index".to_string(), prev);
1844    } else {
1845        ctx.state.client.remove("index");
1846    }
1847    out
1848}
1849
1850fn op_map(args: &[Value], ctx: &mut Context) -> EvalResult {
1851    let list = evaluate(require_arg(args, 0, "map")?, ctx)?;
1852    let expr = require_arg(args, 1, "map")?.clone();
1853    let arr = list
1854        .as_array()
1855        .cloned()
1856        .ok_or_else(|| EvalError::new("map first arg must evaluate to array"))?;
1857    let mut out = Vec::with_capacity(arr.len());
1858    for (i, item) in arr.into_iter().enumerate() {
1859        out.push(with_item_scope(ctx, item, i, |inner| {
1860            evaluate(&expr, inner)
1861        })?);
1862    }
1863    Ok(Value::Array(out))
1864}
1865
1866fn op_filter(args: &[Value], ctx: &mut Context) -> EvalResult {
1867    let list = evaluate(require_arg(args, 0, "filter")?, ctx)?;
1868    let expr = require_arg(args, 1, "filter")?.clone();
1869    let arr = list
1870        .as_array()
1871        .cloned()
1872        .ok_or_else(|| EvalError::new("filter first arg must evaluate to array"))?;
1873    let mut out = Vec::new();
1874    for (i, item) in arr.into_iter().enumerate() {
1875        let keep = with_item_scope(ctx, item.clone(), i, |inner| evaluate(&expr, inner))?;
1876        if truthy(&keep) {
1877            out.push(item);
1878        }
1879    }
1880    Ok(Value::Array(out))
1881}
1882
1883fn op_match(args: &[Value], ctx: &mut Context) -> EvalResult {
1884    let value = evaluate(require_arg(args, 0, "match")?, ctx)?;
1885    let mut i = 1usize;
1886    while i + 1 < args.len() {
1887        let pat = evaluate(&args[i], ctx)?;
1888        if pat == Value::String("_".to_string()) || pat == value {
1889            return evaluate(&args[i + 1], ctx);
1890        }
1891        i += 2;
1892    }
1893    Ok(Value::Null)
1894}
1895
1896fn op_event_value(_args: &[Value], ctx: &mut Context) -> EvalResult {
1897    Ok(ctx.event.map(|e| e.value.clone()).unwrap_or(Value::Null))
1898}
1899
1900fn op_event_key(_args: &[Value], ctx: &mut Context) -> EvalResult {
1901    Ok(ctx.event.map(|e| e.key.clone()).unwrap_or(Value::Null))
1902}
1903
1904fn op_event_prevent(_args: &[Value], ctx: &mut Context) -> EvalResult {
1905    Ok(Value::Bool(ctx.event.map(|e| e.prevent).unwrap_or(false)))
1906}
1907
1908fn op_effect(args: &[Value], ctx: &mut Context) -> EvalResult {
1909    let mut last = Value::Null;
1910    for arg in args {
1911        last = evaluate(arg, ctx)?;
1912    }
1913    Ok(last)
1914}
1915
1916#[cfg(test)]
1917mod tests {
1918    use super::*;
1919    use serde_json::json;
1920
1921    fn eval_expr(expr: Value) -> Value {
1922        let mut state = State::new();
1923        let operators = Operators::new();
1924        let mut ctx = Context {
1925            state: &mut state,
1926            operators: &operators,
1927            event: None,
1928        };
1929        evaluate(&expr, &mut ctx).expect("evaluate")
1930    }
1931
1932    #[test]
1933    fn util_path_ops_work() {
1934        let payload = json!({
1935            "headers": {
1936                "Authorization": "Bearer A"
1937            },
1938            "user": {
1939                "id": 7,
1940                "profile": {
1941                    "email": "x@example.com"
1942                }
1943            },
1944            "items": [{"sku":"a"}, {"sku":"b"}]
1945        });
1946
1947        let v = eval_expr(json!([
1948            "util.get_path",
1949            payload.clone(),
1950            "user.profile.email",
1951            null
1952        ]));
1953        assert_eq!(v, json!("x@example.com"));
1954
1955        let v = eval_expr(json!([
1956            "util.one_of",
1957            payload.clone(),
1958            [
1959                "headers.authorization",
1960                "headers.Authorization",
1961                "header.auth"
1962            ],
1963            null
1964        ]));
1965        assert_eq!(v, json!("Bearer A"));
1966
1967        let v = eval_expr(json!([
1968            "util.pick",
1969            payload,
1970            ["user.id", "user.profile.email", "items[1].sku"]
1971        ]));
1972        assert_eq!(
1973            v,
1974            json!({
1975                "user": {
1976                    "id": 7,
1977                    "profile": { "email": "x@example.com" }
1978                },
1979                "items": [null, {"sku":"b"}]
1980            })
1981        );
1982
1983        let v = eval_expr(json!(["util.exists", json!({"a":{"b":[1,2,3]}}), "a.b[1]"]));
1984        assert_eq!(v, json!(true));
1985
1986        let v = eval_expr(json!([
1987            "util.omit",
1988            json!({"a":1,"b":{"c":2},"arr":[1,2,3]}),
1989            ["b.c", "arr[1]"]
1990        ]));
1991        assert_eq!(v, json!({"a":1,"b":{},"arr":[1,null,3]}));
1992    }
1993
1994    #[test]
1995    fn util_replace_ops_work() {
1996        let v = eval_expr(json!(["util.replace", "a-b-c", "-", "_"]));
1997        assert_eq!(v, json!("a_b-c"));
1998
1999        let v = eval_expr(json!(["util.replace_all", "a-b-c", "-", "_"]));
2000        assert_eq!(v, json!("a_b_c"));
2001
2002        let v = eval_expr(json!(["util.as_array", 9]));
2003        assert_eq!(v, json!([9]));
2004
2005        let v = eval_expr(json!(["util.to_bool", "YES"]));
2006        assert_eq!(v, json!(true));
2007
2008        let v = eval_expr(json!(["util.split", "a,b,c", ","]));
2009        assert_eq!(v, json!(["a", "b", "c"]));
2010
2011        let v = eval_expr(json!(["util.join", ["a", 2, true], "-"]));
2012        assert_eq!(v, json!("a-2-true"));
2013    }
2014
2015    #[test]
2016    fn util_regex_ops_work() {
2017        let v = eval_expr(json!(["util.regex_match", "HELLO", "^hello$", {"case_insensitive": true}]));
2018        assert_eq!(v, json!(true));
2019
2020        let v = eval_expr(json!(["util.regex_find", "order-123-x", "\\d+"]));
2021        assert_eq!(v, json!("123"));
2022
2023        let v = eval_expr(json!(["util.regex_find_all", "a1b22c333", "\\d+"]));
2024        assert_eq!(v, json!(["1", "22", "333"]));
2025
2026        let v = eval_expr(json!(["util.regex_replace", "a1b2", "\\d+", "X"]));
2027        assert_eq!(v, json!("aXbX"));
2028
2029        let v = eval_expr(json!(["util.regex_replace", "a1b2", "\\d+", "X", {"all": false}]));
2030        assert_eq!(v, json!("aXb2"));
2031    }
2032
2033    #[test]
2034    fn data_ops_work() {
2035        let rows = json!([
2036            {"id":"1","amount":"12.5","name":"ALICE","extra":{"k":"x"}},
2037            {"id":"2","amount":"7","name":"BOB","extra":{"k":"y"}}
2038        ]);
2039
2040        let v = eval_expr(json!([
2041            "data.select",
2042            rows.clone(),
2043            ["id", "extra.k", "missing"]
2044        ]));
2045        assert_eq!(
2046            v,
2047            json!([
2048                {"id":"1","extra":{"k":"x"},"missing":null},
2049                {"id":"2","extra":{"k":"y"},"missing":null}
2050            ])
2051        );
2052
2053        let v =
2054            eval_expr(json!(["data.rename", rows.clone(), {"name":"full_name", "extra.k":"kind"}]));
2055        assert_eq!(
2056            v,
2057            json!([
2058                {"id":"1","amount":"12.5","full_name":"ALICE","extra":{},"kind":"x"},
2059                {"id":"2","amount":"7","full_name":"BOB","extra":{},"kind":"y"}
2060            ])
2061        );
2062
2063        let v = eval_expr(json!(["data.cast", rows, {"id":"int", "amount":"float"}]));
2064        assert_eq!(
2065            v,
2066            json!([
2067                {"id":1,"amount":12.5,"name":"ALICE","extra":{"k":"x"}},
2068                {"id":2,"amount":7.0,"name":"BOB","extra":{"k":"y"}}
2069            ])
2070        );
2071
2072        let v = eval_expr(json!(["data.chunk", [1, 2, 3, 4, 5], 2]));
2073        assert_eq!(v, json!([[1, 2], [3, 4], [5]]));
2074
2075        let v = eval_expr(json!([
2076            "data.flat_map",
2077            [
2078                {"id":1,"items":[{"sku":"a"},{"sku":"b"}]},
2079                {"id":2,"items":[{"sku":"c"}]}
2080            ],
2081            "items"
2082        ]));
2083        assert_eq!(
2084            v,
2085            json!([
2086                {"id":1,"items":{"sku":"a"}},
2087                {"id":1,"items":{"sku":"b"}},
2088                {"id":2,"items":{"sku":"c"}}
2089            ])
2090        );
2091
2092        let v = eval_expr(json!([
2093            "data.distinct",
2094            [
2095                {"id":1,"email":"a@x"},
2096                {"id":2,"email":"a@x"},
2097                {"id":3,"email":"b@x"}
2098            ],
2099            ["email"]
2100        ]));
2101        assert_eq!(
2102            v,
2103            json!([
2104                {"id":1,"email":"a@x"},
2105                {"id":3,"email":"b@x"}
2106            ])
2107        );
2108
2109        let v = eval_expr(json!([
2110            "data.sort_by",
2111            [
2112                {"id":1,"score":9},
2113                {"id":2,"score":3},
2114                {"id":3,"score":7}
2115            ],
2116            [{"col":"score","dir":"desc"}]
2117        ]));
2118        assert_eq!(
2119            v,
2120            json!([
2121                {"id":1,"score":9},
2122                {"id":3,"score":7},
2123                {"id":2,"score":3}
2124            ])
2125        );
2126
2127        let grouped = eval_expr(json!([
2128            "data.group_by",
2129            [
2130                {"user":"u1","amount":10},
2131                {"user":"u1","amount":5},
2132                {"user":"u2","amount":3}
2133            ],
2134            ["user"]
2135        ]));
2136        let v = eval_expr(json!([
2137            "data.aggregate",
2138            grouped,
2139            {"n":"count(*)","total":"sum(amount)"}
2140        ]));
2141        assert_eq!(
2142            v,
2143            json!([
2144                {"user":"u1","n":2,"total":15.0},
2145                {"user":"u2","n":1,"total":3.0}
2146            ])
2147        );
2148
2149        let v = eval_expr(json!([
2150            "data.join",
2151            [
2152                {"user":"u1","l":1},
2153                {"user":"u2","l":2}
2154            ],
2155            [
2156                {"user":"u1","r":"A"}
2157            ],
2158            {"on":["user"],"type":"left"}
2159        ]));
2160        assert_eq!(
2161            v,
2162            json!([
2163                {"user":"u1","l":1,"right_user":"u1","r":"A"},
2164                {"user":"u2","l":2}
2165            ])
2166        );
2167    }
2168
2169    #[test]
2170    fn pipe_restores_scope_on_error_without_previous_pipe() {
2171        let mut state = State::new();
2172        state.client.insert("stable".to_string(), json!("ok"));
2173        let operators = Operators::new();
2174        let mut ctx = Context {
2175            state: &mut state,
2176            operators: &operators,
2177            event: None,
2178        };
2179        let out = evaluate(
2180            &json!(["pipe", ["set", "x", 1], ["call", "missing_function"]]),
2181            &mut ctx,
2182        );
2183        assert!(out.is_err());
2184        assert!(ctx.state.client.get("pipe").is_none());
2185        assert_eq!(ctx.state.client.get("stable"), Some(&json!("ok")));
2186    }
2187
2188    #[test]
2189    fn pipe_restores_previous_pipe_on_error() {
2190        let mut state = State::new();
2191        let prev = json!({"prev":"seed"});
2192        state.client.insert("pipe".to_string(), prev.clone());
2193        let operators = Operators::new();
2194        let mut ctx = Context {
2195            state: &mut state,
2196            operators: &operators,
2197            event: None,
2198        };
2199        let out = evaluate(&json!(["pipe", ["call", "missing_function"]]), &mut ctx);
2200        assert!(out.is_err());
2201        assert_eq!(ctx.state.client.get("pipe"), Some(&prev));
2202    }
2203}