1use regex::RegexBuilder;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map, Value, json};
14use std::collections::{HashMap, HashSet};
15
16pub type JsonNode = Value;
18pub type EvalResult = Result<Value, EvalError>;
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct EvalError {
24 pub message: String,
25}
26
27impl EvalError {
28 pub fn new(msg: impl Into<String>) -> Self {
30 Self {
31 message: msg.into(),
32 }
33 }
34}
35
36#[derive(Debug, Clone)]
38pub struct FunctionDef {
39 pub params: Vec<String>,
40 pub body: Value,
41 pub memoizable: bool,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct State {
47 pub server: Map<String, Value>,
48 pub client: Map<String, Value>,
49 #[serde(skip)]
50 pub fns: HashMap<String, FunctionDef>,
51 #[serde(skip)]
52 pub call_locals: Vec<HashMap<String, Value>>,
53 #[serde(skip)]
54 pub call_memo: HashMap<String, HashMap<String, Value>>,
55}
56
57impl State {
58 pub fn new() -> Self {
60 Self {
61 server: Map::new(),
62 client: Map::new(),
63 fns: HashMap::new(),
64 call_locals: Vec::new(),
65 call_memo: HashMap::new(),
66 }
67 }
68}
69
70impl Default for State {
71 fn default() -> Self {
72 Self::new()
73 }
74}
75
76#[derive(Debug, Clone)]
78pub struct EventContext {
79 pub value: Value,
80 pub key: Value,
81 pub prevent: bool,
82}
83
84impl Default for EventContext {
85 fn default() -> Self {
86 Self {
87 value: Value::Null,
88 key: Value::Null,
89 prevent: false,
90 }
91 }
92}
93
94pub struct Context<'a> {
96 pub state: &'a mut State,
97 pub operators: &'a Operators,
98 pub event: Option<&'a EventContext>,
99}
100
101pub type Operator = fn(args: &[Value], ctx: &mut Context) -> EvalResult;
103
104pub struct Operators {
108 ops: HashMap<String, Operator>,
109}
110
111impl Operators {
112 pub fn new() -> Self {
114 let mut ops: HashMap<String, Operator> = HashMap::new();
115 ops.insert("var".into(), op_var as Operator);
116 ops.insert("set".into(), op_set as Operator);
117 ops.insert("+".into(), op_add as Operator);
118 ops.insert("-".into(), op_sub as Operator);
119 ops.insert("*".into(), op_mul as Operator);
120 ops.insert("/".into(), op_div as Operator);
121 ops.insert("%".into(), op_mod as Operator);
122 ops.insert("==".into(), op_eq as Operator);
123 ops.insert("!=".into(), op_neq as Operator);
124 ops.insert(">".into(), op_gt as Operator);
125 ops.insert("<".into(), op_lt as Operator);
126 ops.insert(">=".into(), op_gte as Operator);
127 ops.insert("<=".into(), op_lte as Operator);
128 ops.insert("if".into(), op_if as Operator);
129 ops.insert("&&".into(), op_and as Operator);
130 ops.insert("||".into(), op_or as Operator);
131 ops.insert("!".into(), op_not as Operator);
132 ops.insert("util.concat".into(), op_concat as Operator);
133 ops.insert("util.lower".into(), op_lower as Operator);
134 ops.insert("util.upper".into(), op_upper as Operator);
135 ops.insert("util.contains".into(), op_contains as Operator);
136 ops.insert("util.template".into(), op_template as Operator);
137 ops.insert("util.to_int".into(), op_to_int as Operator);
138 ops.insert("util.to_float".into(), op_to_float as Operator);
139 ops.insert("util.to_string".into(), op_to_string as Operator);
140 ops.insert("util.trim".into(), op_trim as Operator);
141 ops.insert("util.str_len".into(), op_str_len as Operator);
142 ops.insert("util.get_path".into(), op_util_get_path as Operator);
143 ops.insert("util.one_of".into(), op_util_one_of as Operator);
144 ops.insert("util.pick".into(), op_util_pick as Operator);
145 ops.insert("util.exists".into(), op_util_exists as Operator);
146 ops.insert("util.omit".into(), op_util_omit as Operator);
147 ops.insert("util.as_array".into(), op_util_as_array as Operator);
148 ops.insert("util.to_bool".into(), op_util_to_bool as Operator);
149 ops.insert("util.replace".into(), op_util_replace as Operator);
150 ops.insert("util.replace_all".into(), op_util_replace_all as Operator);
151 ops.insert("util.split".into(), op_util_split as Operator);
152 ops.insert("util.join".into(), op_util_join as Operator);
153 ops.insert("util.regex_match".into(), op_util_regex_match as Operator);
154 ops.insert("util.regex_find".into(), op_util_regex_find as Operator);
155 ops.insert("util.regex_find_all".into(), op_util_regex_find_all as Operator);
156 ops.insert("util.regex_replace".into(), op_util_regex_replace as Operator);
157 ops.insert("log".into(), op_log as Operator);
158 ops.insert("len".into(), op_len as Operator);
159 ops.insert("push".into(), op_push as Operator);
160 ops.insert("get".into(), op_get as Operator);
161 ops.insert("data.select".into(), op_data_select as Operator);
162 ops.insert("data.rename".into(), op_data_rename as Operator);
163 ops.insert("data.cast".into(), op_data_cast as Operator);
164 ops.insert("data.chunk".into(), op_data_chunk as Operator);
165 ops.insert("data.flat_map".into(), op_data_flat_map as Operator);
166 ops.insert("data.distinct".into(), op_data_distinct as Operator);
167 ops.insert("data.sort_by".into(), op_data_sort_by as Operator);
168 ops.insert("data.group_by".into(), op_data_group_by as Operator);
169 ops.insert("data.aggregate".into(), op_data_aggregate as Operator);
170 ops.insert("data.join".into(), op_data_join as Operator);
171 ops.insert("do".into(), op_do as Operator);
172 ops.insert("pipe".into(), op_pipe as Operator);
173 ops.insert("def".into(), op_def as Operator);
174 ops.insert("call".into(), op_call as Operator);
175 ops.insert("map".into(), op_map as Operator);
176 ops.insert("filter".into(), op_filter as Operator);
177 ops.insert("match".into(), op_match as Operator);
178 ops.insert("w.event.value".into(), op_event_value as Operator);
179 ops.insert("w.event.key".into(), op_event_key as Operator);
180 ops.insert("w.event.prevent".into(), op_event_prevent as Operator);
181 ops.insert("effect".into(), op_effect as Operator);
182 Self { ops }
183 }
184
185 pub fn get(&self, name: &str) -> Option<Operator> {
187 self.ops.get(name).copied()
188 }
189
190 pub fn register(&mut self, name: impl Into<String>, operator: Operator) -> Option<Operator> {
192 self.ops.insert(name.into(), operator)
193 }
194}
195
196impl Default for Operators {
197 fn default() -> Self {
198 Self::new()
199 }
200}
201
202pub fn evaluate(node: &Value, ctx: &mut Context) -> EvalResult {
207 match node {
208 Value::Array(arr) => {
209 if arr.is_empty() {
210 return Ok(Value::Array(vec![]));
211 }
212 if let Some(op_name) = arr.first().and_then(|v| v.as_str()) {
213 if let Some(op) = ctx.operators.get(op_name) {
214 return op(&arr[1..], ctx);
215 }
216 }
217 let mut out = Vec::with_capacity(arr.len());
218 for item in arr {
219 out.push(evaluate(item, ctx)?);
220 }
221 Ok(Value::Array(out))
222 }
223 Value::Object(obj) => {
224 let mut out = Map::new();
225 for (k, v) in obj {
226 out.insert(k.clone(), evaluate(v, ctx)?);
227 }
228 Ok(Value::Object(out))
229 }
230 _ => Ok(node.clone()),
231 }
232}
233
234pub fn get_path(state: &Map<String, Value>, path: &Value) -> Option<Value> {
236 let path = path.as_str()?;
237 if let Some(rest) = path.strip_prefix("client.") {
238 return state
239 .get("client")
240 .and_then(Value::as_object)
241 .and_then(|m| map_get(m, rest));
242 }
243 if let Some(rest) = path.strip_prefix("server.") {
244 return state
245 .get("server")
246 .and_then(Value::as_object)
247 .and_then(|m| map_get(m, rest));
248 }
249 map_get(state, path)
250}
251
252pub fn set_path(state: &mut Map<String, Value>, path: &Value, value: Value) {
254 let Some(path) = path.as_str() else {
255 return;
256 };
257 if let Some(rest) = path.strip_prefix("client.") {
258 let client = state
259 .entry("client".to_string())
260 .or_insert_with(|| Value::Object(Map::new()));
261 if let Some(client_map) = client.as_object_mut() {
262 map_set(client_map, rest, value);
263 }
264 return;
265 }
266 if let Some(rest) = path.strip_prefix("server.") {
267 let server = state
268 .entry("server".to_string())
269 .or_insert_with(|| Value::Object(Map::new()));
270 if let Some(server_map) = server.as_object_mut() {
271 map_set(server_map, rest, value);
272 }
273 return;
274 }
275 map_set(state, path, value);
276}
277
278fn require_arg<'a>(args: &'a [Value], index: usize, op: &str) -> Result<&'a Value, EvalError> {
279 args.get(index)
280 .ok_or_else(|| EvalError::new(format!("{op} requires argument {index}")))
281}
282
283fn as_f64(value: &Value) -> Option<f64> {
284 match value {
285 Value::Number(n) => n.as_f64(),
286 Value::String(s) => s.parse::<f64>().ok(),
287 Value::Bool(b) => Some(if *b { 1.0 } else { 0.0 }),
288 _ => None,
289 }
290}
291
292fn as_i64(value: &Value) -> Option<i64> {
293 match value {
294 Value::Number(n) => n.as_i64().or_else(|| n.as_f64().map(|v| v as i64)),
295 Value::String(s) => s.parse::<i64>().ok(),
296 Value::Bool(b) => Some(if *b { 1 } else { 0 }),
297 _ => None,
298 }
299}
300
301fn truthy(v: &Value) -> bool {
302 match v {
303 Value::Null => false,
304 Value::Bool(b) => *b,
305 Value::Number(n) => n.as_f64().map(|f| f != 0.0).unwrap_or(false),
306 Value::String(s) => !s.is_empty(),
307 Value::Array(a) => !a.is_empty(),
308 Value::Object(o) => !o.is_empty(),
309 }
310}
311
312fn read_state_path(state: &State, path: &str) -> Option<Value> {
313 if !path.contains('.') {
314 for frame in state.call_locals.iter().rev() {
315 if let Some(v) = frame.get(path) {
316 return Some(v.clone());
317 }
318 }
319 }
320 if let Some(rest) = path.strip_prefix("client.") {
321 return map_get(&state.client, rest);
322 }
323 if let Some(rest) = path.strip_prefix("server.") {
324 return map_get(&state.server, rest);
325 }
326 map_get(&state.client, path).or_else(|| map_get(&state.server, path))
327}
328
329fn write_state_path(state: &mut State, path: &str, value: Value) {
330 if !path.contains('.') {
331 if let Some(frame) = state.call_locals.last_mut() {
332 if frame.contains_key(path) {
333 frame.insert(path.to_string(), value);
334 return;
335 }
336 }
337 }
338 if let Some(rest) = path.strip_prefix("client.") {
339 map_set(&mut state.client, rest, value);
340 return;
341 }
342 if let Some(rest) = path.strip_prefix("server.") {
343 map_set(&mut state.server, rest, value);
344 return;
345 }
346 map_set(&mut state.client, path, value);
347}
348
349fn map_get(map: &Map<String, Value>, path: &str) -> Option<Value> {
350 if path.is_empty() {
351 return Some(Value::Object(map.clone()));
352 }
353 let mut parts = path.split('.');
354 let first = parts.next()?;
355 let mut current = map.get(first)?;
356 for part in parts {
357 match current {
358 Value::Object(obj) => current = obj.get(part)?,
359 Value::Array(arr) => {
360 let idx = part.parse::<usize>().ok()?;
361 current = arr.get(idx)?;
362 }
363 _ => return None,
364 }
365 }
366 Some(current.clone())
367}
368
369fn map_set(map: &mut Map<String, Value>, path: &str, value: Value) {
370 let parts: Vec<&str> = path.split('.').filter(|p| !p.is_empty()).collect();
371 if parts.is_empty() {
372 return;
373 }
374 let mut cur = map;
375 for part in &parts[..parts.len() - 1] {
376 let entry = cur
377 .entry((*part).to_string())
378 .or_insert_with(|| Value::Object(Map::new()));
379 if !entry.is_object() {
380 *entry = Value::Object(Map::new());
381 }
382 cur = entry
383 .as_object_mut()
384 .expect("object ensured before descending");
385 }
386 cur.insert(parts[parts.len() - 1].to_string(), value);
387}
388
389#[derive(Debug, Clone)]
390enum PathSegment {
391 Key(String),
392 Index(usize),
393}
394
395fn parse_path(path: &str) -> Option<Vec<PathSegment>> {
396 if path.is_empty() {
397 return Some(Vec::new());
398 }
399
400 let chars: Vec<char> = path.chars().collect();
401 let mut i = 0usize;
402 let mut key = String::new();
403 let mut out = Vec::new();
404
405 while i < chars.len() {
406 match chars[i] {
407 '.' => {
408 if !key.is_empty() {
409 out.push(PathSegment::Key(std::mem::take(&mut key)));
410 }
411 i += 1;
412 }
413 '[' => {
414 if !key.is_empty() {
415 out.push(PathSegment::Key(std::mem::take(&mut key)));
416 }
417 i += 1;
418 let start = i;
419 while i < chars.len() && chars[i] != ']' {
420 i += 1;
421 }
422 if i >= chars.len() || i == start {
423 return None;
424 }
425 let idx_text: String = chars[start..i].iter().collect();
426 let idx = idx_text.parse::<usize>().ok()?;
427 out.push(PathSegment::Index(idx));
428 i += 1;
429 }
430 c => {
431 key.push(c);
432 i += 1;
433 }
434 }
435 }
436
437 if !key.is_empty() {
438 out.push(PathSegment::Key(key));
439 }
440 Some(out)
441}
442
443fn value_get_path(value: &Value, path: &str) -> Option<Value> {
444 let segs = parse_path(path)?;
445 if segs.is_empty() {
446 return Some(value.clone());
447 }
448 value_get_by_segments(value, &segs)
449}
450
451fn value_get_by_segments(value: &Value, segs: &[PathSegment]) -> Option<Value> {
452 let mut cur = value;
453 for seg in segs {
454 match seg {
455 PathSegment::Key(k) => cur = cur.as_object()?.get(k)?,
456 PathSegment::Index(i) => cur = cur.as_array()?.get(*i)?,
457 }
458 }
459 Some(cur.clone())
460}
461
462fn value_set_by_segments(target: &mut Value, segs: &[PathSegment], value: Value) {
463 fn set_at(cur: &mut Value, segs: &[PathSegment], idx: usize, value: Value) {
464 if idx >= segs.len() {
465 *cur = value;
466 return;
467 }
468 match &segs[idx] {
469 PathSegment::Key(k) => {
470 if !cur.is_object() {
471 *cur = Value::Object(Map::new());
472 }
473 let obj = cur.as_object_mut().expect("object ensured");
474 let entry = obj.entry(k.clone()).or_insert(Value::Null);
475 set_at(entry, segs, idx + 1, value);
476 }
477 PathSegment::Index(i) => {
478 if !cur.is_array() {
479 *cur = Value::Array(Vec::new());
480 }
481 let arr = cur.as_array_mut().expect("array ensured");
482 if arr.len() <= *i {
483 arr.resize(*i + 1, Value::Null);
484 }
485 set_at(&mut arr[*i], segs, idx + 1, value);
486 }
487 }
488 }
489
490 set_at(target, segs, 0, value);
491}
492
493fn value_take_by_segments(target: &mut Value, segs: &[PathSegment]) -> Option<Value> {
494 if segs.is_empty() {
495 return None;
496 }
497
498 fn take_at(cur: &mut Value, segs: &[PathSegment], idx: usize) -> Option<Value> {
499 if idx + 1 == segs.len() {
500 return match &segs[idx] {
501 PathSegment::Key(k) => cur.as_object_mut()?.remove(k),
502 PathSegment::Index(i) => {
503 let arr = cur.as_array_mut()?;
504 if *i >= arr.len() {
505 None
506 } else {
507 Some(std::mem::replace(&mut arr[*i], Value::Null))
508 }
509 }
510 };
511 }
512
513 match &segs[idx] {
514 PathSegment::Key(k) => take_at(cur.as_object_mut()?.get_mut(k)?, segs, idx + 1),
515 PathSegment::Index(i) => take_at(cur.as_array_mut()?.get_mut(*i)?, segs, idx + 1),
516 }
517 }
518
519 take_at(target, segs, 0)
520}
521
522fn value_to_string(v: Value) -> String {
523 match v {
524 Value::String(s) => s,
525 Value::Null => String::new(),
526 other => other.to_string(),
527 }
528}
529
530fn value_to_bool(v: Value) -> bool {
531 match v {
532 Value::Bool(b) => b,
533 Value::Number(n) => n.as_f64().map(|x| x != 0.0).unwrap_or(false),
534 Value::String(s) => {
535 let norm = s.trim().to_ascii_lowercase();
536 matches!(norm.as_str(), "1" | "true" | "yes" | "y" | "on")
537 }
538 Value::Array(a) => !a.is_empty(),
539 Value::Object(o) => !o.is_empty(),
540 Value::Null => false,
541 }
542}
543
544fn cast_value(value: Value, ty: &str) -> Value {
545 match ty.trim().to_ascii_lowercase().as_str() {
546 "string" | "str" | "text" => Value::String(value_to_string(value)),
547 "int" | "i64" | "u64" | "integer" => json!(as_i64(&value).unwrap_or(0)),
548 "float" | "f64" | "number" => json!(as_f64(&value).unwrap_or(0.0)),
549 "bool" | "boolean" => Value::Bool(value_to_bool(value)),
550 "datetime" | "timestamp" => Value::String(value_to_string(value)),
551 _ => value,
552 }
553}
554
555#[derive(Debug, Clone, Default)]
556struct RegexOpts {
557 case_insensitive: bool,
558 multi_line: bool,
559 dot_matches_new_line: bool,
560 unicode: bool,
561 all: bool,
562}
563
564fn parse_regex_opts(raw: Option<&Value>) -> RegexOpts {
565 let mut out = RegexOpts {
566 unicode: true,
567 all: true,
568 ..RegexOpts::default()
569 };
570
571 let Some(raw) = raw else {
572 return out;
573 };
574
575 match raw {
576 Value::Bool(b) => out.all = *b,
577 Value::String(flags) => {
578 for ch in flags.chars() {
579 match ch {
580 'i' | 'I' => out.case_insensitive = true,
581 'm' | 'M' => out.multi_line = true,
582 's' | 'S' => out.dot_matches_new_line = true,
583 'u' | 'U' => out.unicode = true,
584 'g' | 'G' => out.all = true,
585 _ => {}
586 }
587 }
588 }
589 Value::Object(map) => {
590 out.case_insensitive = map
591 .get("case_insensitive")
592 .or_else(|| map.get("ignore_case"))
593 .map(|v| value_to_bool(v.clone()))
594 .unwrap_or(false);
595 out.multi_line = map
596 .get("multi_line")
597 .or_else(|| map.get("multiline"))
598 .map(|v| value_to_bool(v.clone()))
599 .unwrap_or(false);
600 out.dot_matches_new_line = map
601 .get("dot_matches_new_line")
602 .or_else(|| map.get("dot_all"))
603 .map(|v| value_to_bool(v.clone()))
604 .unwrap_or(false);
605 out.unicode = map
606 .get("unicode")
607 .map(|v| value_to_bool(v.clone()))
608 .unwrap_or(true);
609 out.all = map
610 .get("all")
611 .map(|v| value_to_bool(v.clone()))
612 .unwrap_or(true);
613 }
614 _ => {}
615 }
616
617 out
618}
619
620fn build_regex(pattern: &str, opts: &RegexOpts) -> Result<regex::Regex, EvalError> {
621 let mut builder = RegexBuilder::new(pattern);
622 builder
623 .case_insensitive(opts.case_insensitive)
624 .multi_line(opts.multi_line)
625 .dot_matches_new_line(opts.dot_matches_new_line)
626 .unicode(opts.unicode);
627 builder
628 .build()
629 .map_err(|e| EvalError::new(format!("invalid regex '{pattern}': {e}")))
630}
631
632fn cmp_numbers_or_strings(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
633 match (as_f64(a), as_f64(b)) {
634 (Some(x), Some(y)) => x.partial_cmp(&y),
635 _ => Some(a.to_string().cmp(&b.to_string())),
636 }
637}
638
639#[derive(Debug, Clone)]
640enum AggExpr {
641 CountAll,
642 CountField(Vec<PathSegment>),
643 Sum(Vec<PathSegment>),
644 Avg(Vec<PathSegment>),
645 Min(Vec<PathSegment>),
646 Max(Vec<PathSegment>),
647}
648
649fn parse_agg_expr(raw: &str) -> Option<AggExpr> {
650 let raw = raw.trim();
651 let text = raw.to_ascii_lowercase();
652 if text == "count(*)" {
653 return Some(AggExpr::CountAll);
654 }
655 let parse_fn = |name: &str| -> Option<Vec<PathSegment>> {
656 let prefix = format!("{name}(");
657 if !text.starts_with(&prefix) || !text.ends_with(')') {
658 return None;
659 }
660 let inner = raw[prefix.len()..raw.len() - 1].trim();
661 parse_path(inner)
662 };
663
664 parse_fn("count")
665 .map(AggExpr::CountField)
666 .or_else(|| parse_fn("sum").map(AggExpr::Sum))
667 .or_else(|| parse_fn("avg").map(AggExpr::Avg))
668 .or_else(|| parse_fn("min").map(AggExpr::Min))
669 .or_else(|| parse_fn("max").map(AggExpr::Max))
670}
671
672fn op_var(args: &[Value], ctx: &mut Context) -> EvalResult {
673 let raw = evaluate(require_arg(args, 0, "var")?, ctx)?;
674 let Some(path) = raw.as_str() else {
675 return Ok(Value::Null);
676 };
677 Ok(read_state_path(ctx.state, path).unwrap_or(Value::Null))
678}
679
680fn op_set(args: &[Value], ctx: &mut Context) -> EvalResult {
681 let path_val = evaluate(require_arg(args, 0, "set")?, ctx)?;
682 let value = evaluate(require_arg(args, 1, "set")?, ctx)?;
683 let Some(path) = path_val.as_str() else {
684 return Err(EvalError::new("set path must evaluate to string"));
685 };
686 write_state_path(ctx.state, path, value.clone());
687 Ok(value)
688}
689
690fn op_add(args: &[Value], ctx: &mut Context) -> EvalResult {
691 let mut total = 0.0;
692 for arg in args {
693 let v = evaluate(arg, ctx)?;
694 total += as_f64(&v).unwrap_or(0.0);
695 }
696 Ok(json!(total))
697}
698
699fn op_sub(args: &[Value], ctx: &mut Context) -> EvalResult {
700 let first = evaluate(require_arg(args, 0, "-")?, ctx)?;
701 let second = evaluate(require_arg(args, 1, "-")?, ctx)?;
702 Ok(json!(
703 as_f64(&first).unwrap_or(0.0) - as_f64(&second).unwrap_or(0.0)
704 ))
705}
706
707fn op_mul(args: &[Value], ctx: &mut Context) -> EvalResult {
708 let mut product = 1.0;
709 for arg in args {
710 let v = evaluate(arg, ctx)?;
711 product *= as_f64(&v).unwrap_or(0.0);
712 }
713 Ok(json!(product))
714}
715
716fn op_div(args: &[Value], ctx: &mut Context) -> EvalResult {
717 let a = evaluate(require_arg(args, 0, "/")?, ctx)?;
718 let b = evaluate(require_arg(args, 1, "/")?, ctx)?;
719 let denom = as_f64(&b).unwrap_or(0.0);
720 if denom == 0.0 {
721 return Err(EvalError::new("div by zero"));
722 }
723 Ok(json!(as_f64(&a).unwrap_or(0.0) / denom))
724}
725
726fn op_mod(args: &[Value], ctx: &mut Context) -> EvalResult {
727 let a = evaluate(require_arg(args, 0, "%")?, ctx)?;
728 let b = evaluate(require_arg(args, 1, "%")?, ctx)?;
729 let denom = as_i64(&b).unwrap_or(0);
730 if denom == 0 {
731 return Err(EvalError::new("mod by zero"));
732 }
733 Ok(json!(as_i64(&a).unwrap_or(0) % denom))
734}
735
736fn op_eq(args: &[Value], ctx: &mut Context) -> EvalResult {
737 let a = evaluate(require_arg(args, 0, "==")?, ctx)?;
738 let b = evaluate(require_arg(args, 1, "==")?, ctx)?;
739 Ok(Value::Bool(a == b))
740}
741
742fn op_neq(args: &[Value], ctx: &mut Context) -> EvalResult {
743 let a = evaluate(require_arg(args, 0, "!=")?, ctx)?;
744 let b = evaluate(require_arg(args, 1, "!=")?, ctx)?;
745 Ok(Value::Bool(a != b))
746}
747
748fn op_gt(args: &[Value], ctx: &mut Context) -> EvalResult {
749 let a = evaluate(require_arg(args, 0, ">")?, ctx)?;
750 let b = evaluate(require_arg(args, 1, ">")?, ctx)?;
751 Ok(Value::Bool(
752 cmp_numbers_or_strings(&a, &b).is_some_and(|o| o.is_gt()),
753 ))
754}
755
756fn op_lt(args: &[Value], ctx: &mut Context) -> EvalResult {
757 let a = evaluate(require_arg(args, 0, "<")?, ctx)?;
758 let b = evaluate(require_arg(args, 1, "<")?, ctx)?;
759 Ok(Value::Bool(
760 cmp_numbers_or_strings(&a, &b).is_some_and(|o| o.is_lt()),
761 ))
762}
763
764fn op_gte(args: &[Value], ctx: &mut Context) -> EvalResult {
765 let a = evaluate(require_arg(args, 0, ">=")?, ctx)?;
766 let b = evaluate(require_arg(args, 1, ">=")?, ctx)?;
767 Ok(Value::Bool(
768 cmp_numbers_or_strings(&a, &b).is_some_and(|o| !o.is_lt()),
769 ))
770}
771
772fn op_lte(args: &[Value], ctx: &mut Context) -> EvalResult {
773 let a = evaluate(require_arg(args, 0, "<=")?, ctx)?;
774 let b = evaluate(require_arg(args, 1, "<=")?, ctx)?;
775 Ok(Value::Bool(
776 cmp_numbers_or_strings(&a, &b).is_some_and(|o| !o.is_gt()),
777 ))
778}
779
780fn op_if(args: &[Value], ctx: &mut Context) -> EvalResult {
781 let cond = evaluate(require_arg(args, 0, "if")?, ctx)?;
782 if truthy(&cond) {
783 if let Some(t) = args.get(1) {
784 evaluate(t, ctx)
785 } else {
786 Ok(Value::Null)
787 }
788 } else if let Some(f) = args.get(2) {
789 evaluate(f, ctx)
790 } else {
791 Ok(Value::Null)
792 }
793}
794
795fn op_and(args: &[Value], ctx: &mut Context) -> EvalResult {
796 let mut last = Value::Bool(true);
797 for arg in args {
798 last = evaluate(arg, ctx)?;
799 if !truthy(&last) {
800 return Ok(Value::Bool(false));
801 }
802 }
803 Ok(last)
804}
805
806fn op_or(args: &[Value], ctx: &mut Context) -> EvalResult {
807 for arg in args {
808 let v = evaluate(arg, ctx)?;
809 if truthy(&v) {
810 return Ok(v);
811 }
812 }
813 Ok(Value::Bool(false))
814}
815
816fn op_not(args: &[Value], ctx: &mut Context) -> EvalResult {
817 let v = evaluate(require_arg(args, 0, "!")?, ctx)?;
818 Ok(Value::Bool(!truthy(&v)))
819}
820
821fn op_concat(args: &[Value], ctx: &mut Context) -> EvalResult {
822 let mut out = String::new();
823 for arg in args {
824 let v = evaluate(arg, ctx)?;
825 match v {
826 Value::String(s) => out.push_str(&s),
827 Value::Null => {}
828 other => out.push_str(&other.to_string()),
829 }
830 }
831 Ok(Value::String(out))
832}
833
834fn op_lower(args: &[Value], ctx: &mut Context) -> EvalResult {
835 let v = evaluate(require_arg(args, 0, "util.lower")?, ctx)?;
836 Ok(Value::String(
837 v.as_str().unwrap_or(&v.to_string()).to_lowercase(),
838 ))
839}
840
841fn op_upper(args: &[Value], ctx: &mut Context) -> EvalResult {
842 let v = evaluate(require_arg(args, 0, "util.upper")?, ctx)?;
843 Ok(Value::String(
844 v.as_str().unwrap_or(&v.to_string()).to_uppercase(),
845 ))
846}
847
848fn op_contains(args: &[Value], ctx: &mut Context) -> EvalResult {
849 let hay = evaluate(require_arg(args, 0, "util.contains")?, ctx)?;
850 let needle = evaluate(require_arg(args, 1, "util.contains")?, ctx)?;
851 let result = match hay {
852 Value::String(s) => s.contains(needle.as_str().unwrap_or(&needle.to_string())),
853 Value::Array(arr) => arr.contains(&needle),
854 _ => false,
855 };
856 Ok(Value::Bool(result))
857}
858
859fn op_template(args: &[Value], ctx: &mut Context) -> EvalResult {
860 let fmt = evaluate(require_arg(args, 0, "util.template")?, ctx)?;
861 let mut out = fmt.as_str().unwrap_or(&fmt.to_string()).to_string();
862 for arg in &args[1..] {
863 let v = evaluate(arg, ctx)?;
864 let rep = match v {
865 Value::String(s) => s,
866 Value::Null => String::new(),
867 other => other.to_string(),
868 };
869 if out.contains("{}") {
870 out = out.replacen("{}", &rep, 1);
871 }
872 }
873 Ok(Value::String(out))
874}
875
876fn op_to_int(args: &[Value], ctx: &mut Context) -> EvalResult {
877 let v = evaluate(require_arg(args, 0, "util.to_int")?, ctx)?;
878 Ok(json!(as_i64(&v).unwrap_or(0)))
879}
880
881fn op_to_float(args: &[Value], ctx: &mut Context) -> EvalResult {
882 let v = evaluate(require_arg(args, 0, "util.to_float")?, ctx)?;
883 Ok(json!(as_f64(&v).unwrap_or(0.0)))
884}
885
886fn op_to_string(args: &[Value], ctx: &mut Context) -> EvalResult {
887 let v = evaluate(require_arg(args, 0, "util.to_string")?, ctx)?;
888 Ok(Value::String(match v {
889 Value::String(s) => s,
890 Value::Null => String::new(),
891 other => other.to_string(),
892 }))
893}
894
895fn op_trim(args: &[Value], ctx: &mut Context) -> EvalResult {
896 let v = evaluate(require_arg(args, 0, "util.trim")?, ctx)?;
897 Ok(Value::String(
898 v.as_str().unwrap_or(&v.to_string()).trim().to_string(),
899 ))
900}
901
902fn op_str_len(args: &[Value], ctx: &mut Context) -> EvalResult {
903 let v = evaluate(require_arg(args, 0, "util.str_len")?, ctx)?;
904 Ok(json!(v.as_str().unwrap_or(&v.to_string()).chars().count()))
905}
906
907fn op_util_get_path(args: &[Value], ctx: &mut Context) -> EvalResult {
908 let source = evaluate(require_arg(args, 0, "util.get_path")?, ctx)?;
909 let path = evaluate(require_arg(args, 1, "util.get_path")?, ctx)?;
910 let fallback = if let Some(arg) = args.get(2) {
911 evaluate(arg, ctx)?
912 } else {
913 Value::Null
914 };
915 let Some(path) = path.as_str() else {
916 return Ok(fallback);
917 };
918 Ok(value_get_path(&source, path).unwrap_or(fallback))
919}
920
921fn op_util_one_of(args: &[Value], ctx: &mut Context) -> EvalResult {
922 let source = evaluate(require_arg(args, 0, "util.one_of")?, ctx)?;
923 let paths = evaluate(require_arg(args, 1, "util.one_of")?, ctx)?;
924 let fallback = if let Some(arg) = args.get(2) {
925 evaluate(arg, ctx)?
926 } else {
927 Value::Null
928 };
929
930 if let Some(list) = paths.as_array() {
931 for item in list {
932 if let Some(path) = item.as_str() {
933 if let Some(v) = value_get_path(&source, path) {
934 return Ok(v);
935 }
936 }
937 }
938 } else if let Some(path) = paths.as_str() {
939 if let Some(v) = value_get_path(&source, path) {
940 return Ok(v);
941 }
942 }
943
944 Ok(fallback)
945}
946
947fn op_util_pick(args: &[Value], ctx: &mut Context) -> EvalResult {
948 let source = evaluate(require_arg(args, 0, "util.pick")?, ctx)?;
949 let paths = evaluate(require_arg(args, 1, "util.pick")?, ctx)?;
950 let mut out = Value::Object(Map::new());
951
952 if let Some(list) = paths.as_array() {
953 for item in list {
954 let Some(path) = item.as_str() else {
955 continue;
956 };
957 let Some(segs) = parse_path(path) else {
958 continue;
959 };
960 if let Some(v) = value_get_by_segments(&source, &segs) {
961 value_set_by_segments(&mut out, &segs, v);
962 }
963 }
964 }
965
966 Ok(out)
967}
968
969fn op_util_exists(args: &[Value], ctx: &mut Context) -> EvalResult {
970 let source = evaluate(require_arg(args, 0, "util.exists")?, ctx)?;
971 let path = evaluate(require_arg(args, 1, "util.exists")?, ctx)?;
972 let Some(path) = path.as_str() else {
973 return Ok(Value::Bool(false));
974 };
975 Ok(Value::Bool(value_get_path(&source, path).is_some()))
976}
977
978fn op_util_omit(args: &[Value], ctx: &mut Context) -> EvalResult {
979 let source = evaluate(require_arg(args, 0, "util.omit")?, ctx)?;
980 let paths = evaluate(require_arg(args, 1, "util.omit")?, ctx)?;
981 let mut out = source;
982
983 if let Some(list) = paths.as_array() {
984 for item in list {
985 let Some(path) = item.as_str() else {
986 continue;
987 };
988 let Some(segs) = parse_path(path) else {
989 continue;
990 };
991 let _ = value_take_by_segments(&mut out, &segs);
992 }
993 }
994
995 Ok(out)
996}
997
998fn op_util_as_array(args: &[Value], ctx: &mut Context) -> EvalResult {
999 let value = evaluate(require_arg(args, 0, "util.as_array")?, ctx)?;
1000 match value {
1001 Value::Array(arr) => Ok(Value::Array(arr)),
1002 Value::Null => Ok(Value::Array(Vec::new())),
1003 other => Ok(Value::Array(vec![other])),
1004 }
1005}
1006
1007fn op_util_to_bool(args: &[Value], ctx: &mut Context) -> EvalResult {
1008 let value = evaluate(require_arg(args, 0, "util.to_bool")?, ctx)?;
1009 Ok(Value::Bool(value_to_bool(value)))
1010}
1011
1012fn op_util_replace(args: &[Value], ctx: &mut Context) -> EvalResult {
1013 let source = value_to_string(evaluate(require_arg(args, 0, "util.replace")?, ctx)?);
1014 let from = value_to_string(evaluate(require_arg(args, 1, "util.replace")?, ctx)?);
1015 let to = value_to_string(evaluate(require_arg(args, 2, "util.replace")?, ctx)?);
1016 if from.is_empty() {
1017 return Ok(Value::String(source));
1018 }
1019 Ok(Value::String(source.replacen(&from, &to, 1)))
1020}
1021
1022fn op_util_replace_all(args: &[Value], ctx: &mut Context) -> EvalResult {
1023 let source = value_to_string(evaluate(require_arg(args, 0, "util.replace_all")?, ctx)?);
1024 let from = value_to_string(evaluate(require_arg(args, 1, "util.replace_all")?, ctx)?);
1025 let to = value_to_string(evaluate(require_arg(args, 2, "util.replace_all")?, ctx)?);
1026 if from.is_empty() {
1027 return Ok(Value::String(source));
1028 }
1029 Ok(Value::String(source.replace(&from, &to)))
1030}
1031
1032fn op_util_split(args: &[Value], ctx: &mut Context) -> EvalResult {
1033 let source = value_to_string(evaluate(require_arg(args, 0, "util.split")?, ctx)?);
1034 let delim = value_to_string(evaluate(require_arg(args, 1, "util.split")?, ctx)?);
1035 if delim.is_empty() {
1036 let parts: Vec<Value> = source
1037 .chars()
1038 .map(|c| Value::String(c.to_string()))
1039 .collect();
1040 return Ok(Value::Array(parts));
1041 }
1042 Ok(Value::Array(
1043 source
1044 .split(&delim)
1045 .map(|s| Value::String(s.to_string()))
1046 .collect(),
1047 ))
1048}
1049
1050fn op_util_join(args: &[Value], ctx: &mut Context) -> EvalResult {
1051 let values = evaluate(require_arg(args, 0, "util.join")?, ctx)?;
1052 let delim = value_to_string(evaluate(require_arg(args, 1, "util.join")?, ctx)?);
1053 let Some(arr) = values.as_array() else {
1054 return Ok(Value::String(value_to_string(values)));
1055 };
1056 let out = arr
1057 .iter()
1058 .cloned()
1059 .map(value_to_string)
1060 .collect::<Vec<_>>()
1061 .join(&delim);
1062 Ok(Value::String(out))
1063}
1064
1065fn op_util_regex_match(args: &[Value], ctx: &mut Context) -> EvalResult {
1066 let source = value_to_string(evaluate(require_arg(args, 0, "util.regex_match")?, ctx)?);
1067 let pattern = value_to_string(evaluate(require_arg(args, 1, "util.regex_match")?, ctx)?);
1068 let opts_val = if let Some(v) = args.get(2) {
1069 Some(evaluate(v, ctx)?)
1070 } else {
1071 None
1072 };
1073 let opts = parse_regex_opts(opts_val.as_ref());
1074 let re = build_regex(&pattern, &opts)?;
1075 Ok(Value::Bool(re.is_match(&source)))
1076}
1077
1078fn op_util_regex_find(args: &[Value], ctx: &mut Context) -> EvalResult {
1079 let source = value_to_string(evaluate(require_arg(args, 0, "util.regex_find")?, ctx)?);
1080 let pattern = value_to_string(evaluate(require_arg(args, 1, "util.regex_find")?, ctx)?);
1081 let opts_val = if let Some(v) = args.get(2) {
1082 Some(evaluate(v, ctx)?)
1083 } else {
1084 None
1085 };
1086 let opts = parse_regex_opts(opts_val.as_ref());
1087 let re = build_regex(&pattern, &opts)?;
1088 Ok(re
1089 .find(&source)
1090 .map(|m| Value::String(m.as_str().to_string()))
1091 .unwrap_or(Value::Null))
1092}
1093
1094fn op_util_regex_find_all(args: &[Value], ctx: &mut Context) -> EvalResult {
1095 let source = value_to_string(evaluate(require_arg(args, 0, "util.regex_find_all")?, ctx)?);
1096 let pattern = value_to_string(evaluate(require_arg(args, 1, "util.regex_find_all")?, ctx)?);
1097 let opts_val = if let Some(v) = args.get(2) {
1098 Some(evaluate(v, ctx)?)
1099 } else {
1100 None
1101 };
1102 let opts = parse_regex_opts(opts_val.as_ref());
1103 let re = build_regex(&pattern, &opts)?;
1104 Ok(Value::Array(
1105 re.find_iter(&source)
1106 .map(|m| Value::String(m.as_str().to_string()))
1107 .collect(),
1108 ))
1109}
1110
1111fn op_util_regex_replace(args: &[Value], ctx: &mut Context) -> EvalResult {
1112 let source = value_to_string(evaluate(require_arg(args, 0, "util.regex_replace")?, ctx)?);
1113 let pattern = value_to_string(evaluate(require_arg(args, 1, "util.regex_replace")?, ctx)?);
1114 let replacement = value_to_string(evaluate(require_arg(args, 2, "util.regex_replace")?, ctx)?);
1115 let opts_val = if let Some(v) = args.get(3) {
1116 Some(evaluate(v, ctx)?)
1117 } else {
1118 None
1119 };
1120 let opts = parse_regex_opts(opts_val.as_ref());
1121 let re = build_regex(&pattern, &opts)?;
1122
1123 let out = if opts.all {
1124 re.replace_all(&source, replacement.as_str()).to_string()
1125 } else {
1126 re.replace(&source, replacement.as_str()).to_string()
1127 };
1128 Ok(Value::String(out))
1129}
1130
1131fn op_data_select(args: &[Value], ctx: &mut Context) -> EvalResult {
1132 let rows = evaluate(require_arg(args, 0, "data.select")?, ctx)?;
1133 let fields = evaluate(require_arg(args, 1, "data.select")?, ctx)?;
1134 let arr = rows
1135 .as_array()
1136 .ok_or_else(|| EvalError::new("data.select first arg must be array"))?;
1137 let fields = fields
1138 .as_array()
1139 .ok_or_else(|| EvalError::new("data.select second arg must be array"))?;
1140
1141 let mut parsed_fields: Vec<Vec<PathSegment>> = Vec::new();
1142 for field in fields {
1143 if let Some(path) = field.as_str() {
1144 if let Some(segs) = parse_path(path) {
1145 parsed_fields.push(segs);
1146 }
1147 }
1148 }
1149
1150 let mut out = Vec::with_capacity(arr.len());
1151 for row in arr {
1152 let mut next = Value::Object(Map::new());
1153 for segs in &parsed_fields {
1154 let val = value_get_by_segments(row, segs).unwrap_or(Value::Null);
1155 value_set_by_segments(&mut next, segs, val);
1156 }
1157 out.push(next);
1158 }
1159 Ok(Value::Array(out))
1160}
1161
1162fn op_data_rename(args: &[Value], ctx: &mut Context) -> EvalResult {
1163 let rows = evaluate(require_arg(args, 0, "data.rename")?, ctx)?;
1164 let mapping = evaluate(require_arg(args, 1, "data.rename")?, ctx)?;
1165 let arr = rows
1166 .as_array()
1167 .ok_or_else(|| EvalError::new("data.rename first arg must be array"))?;
1168 let mapping = mapping
1169 .as_object()
1170 .ok_or_else(|| EvalError::new("data.rename second arg must be object"))?;
1171
1172 let mut pairs = Vec::new();
1173 for (from, to) in mapping {
1174 let Some(to_path) = to.as_str() else {
1175 continue;
1176 };
1177 let (Some(from_segs), Some(to_segs)) = (parse_path(from), parse_path(to_path)) else {
1178 continue;
1179 };
1180 pairs.push((from_segs, to_segs));
1181 }
1182
1183 let mut out = Vec::with_capacity(arr.len());
1184 for row in arr {
1185 let mut next = row.clone();
1186 for (from, to) in &pairs {
1187 if let Some(v) = value_take_by_segments(&mut next, from) {
1188 value_set_by_segments(&mut next, to, v);
1189 }
1190 }
1191 out.push(next);
1192 }
1193
1194 Ok(Value::Array(out))
1195}
1196
1197fn op_data_cast(args: &[Value], ctx: &mut Context) -> EvalResult {
1198 let rows = evaluate(require_arg(args, 0, "data.cast")?, ctx)?;
1199 let mapping = evaluate(require_arg(args, 1, "data.cast")?, ctx)?;
1200 let arr = rows
1201 .as_array()
1202 .ok_or_else(|| EvalError::new("data.cast first arg must be array"))?;
1203 let mapping = mapping
1204 .as_object()
1205 .ok_or_else(|| EvalError::new("data.cast second arg must be object"))?;
1206
1207 let mut casts = Vec::new();
1208 for (field, ty) in mapping {
1209 let Some(ty_name) = ty.as_str() else {
1210 continue;
1211 };
1212 let Some(segs) = parse_path(field) else {
1213 continue;
1214 };
1215 casts.push((segs, ty_name.to_string()));
1216 }
1217
1218 let mut out = Vec::with_capacity(arr.len());
1219 for row in arr {
1220 let mut next = row.clone();
1221 for (segs, ty) in &casts {
1222 if let Some(v) = value_get_by_segments(&next, segs) {
1223 let casted = cast_value(v, ty);
1224 value_set_by_segments(&mut next, segs, casted);
1225 }
1226 }
1227 out.push(next);
1228 }
1229 Ok(Value::Array(out))
1230}
1231
1232fn op_data_chunk(args: &[Value], ctx: &mut Context) -> EvalResult {
1233 let rows = evaluate(require_arg(args, 0, "data.chunk")?, ctx)?;
1234 let size = evaluate(require_arg(args, 1, "data.chunk")?, ctx)?;
1235 let arr = rows
1236 .as_array()
1237 .ok_or_else(|| EvalError::new("data.chunk first arg must be array"))?;
1238 let n = as_i64(&size).unwrap_or(0);
1239 if n <= 0 {
1240 return Err(EvalError::new("data.chunk size must be > 0"));
1241 }
1242 let n = n as usize;
1243
1244 let mut out = Vec::new();
1245 for chunk in arr.chunks(n) {
1246 out.push(Value::Array(chunk.to_vec()));
1247 }
1248 Ok(Value::Array(out))
1249}
1250
1251fn op_data_flat_map(args: &[Value], ctx: &mut Context) -> EvalResult {
1252 let rows = evaluate(require_arg(args, 0, "data.flat_map")?, ctx)?;
1253 let path = evaluate(require_arg(args, 1, "data.flat_map")?, ctx)?;
1254 let arr = rows
1255 .as_array()
1256 .ok_or_else(|| EvalError::new("data.flat_map first arg must be array"))?;
1257 let Some(path) = path.as_str() else {
1258 return Err(EvalError::new(
1259 "data.flat_map second arg must be path string",
1260 ));
1261 };
1262 let segs = parse_path(path).ok_or_else(|| EvalError::new("data.flat_map path is invalid"))?;
1263
1264 let mut out = Vec::new();
1265 for row in arr {
1266 let Some(value) = value_get_by_segments(row, &segs) else {
1267 continue;
1268 };
1269 match value {
1270 Value::Array(items) => {
1271 for item in items {
1272 let mut next = row.clone();
1273 value_set_by_segments(&mut next, &segs, item);
1274 out.push(next);
1275 }
1276 }
1277 Value::Null => {}
1278 other => {
1279 let mut next = row.clone();
1280 value_set_by_segments(&mut next, &segs, other);
1281 out.push(next);
1282 }
1283 }
1284 }
1285
1286 Ok(Value::Array(out))
1287}
1288
1289fn op_data_distinct(args: &[Value], ctx: &mut Context) -> EvalResult {
1290 let rows = evaluate(require_arg(args, 0, "data.distinct")?, ctx)?;
1291 let arr = rows
1292 .as_array()
1293 .ok_or_else(|| EvalError::new("data.distinct first arg must be array"))?;
1294
1295 let key_paths = if let Some(arg) = args.get(1) {
1296 let keys = evaluate(arg, ctx)?;
1297 keys.as_array().map(|list| {
1298 list.iter()
1299 .filter_map(|v| v.as_str())
1300 .filter_map(parse_path)
1301 .collect::<Vec<_>>()
1302 })
1303 } else {
1304 None
1305 };
1306
1307 let mut seen = HashSet::new();
1308 let mut out = Vec::new();
1309 for row in arr {
1310 let sig = if let Some(paths) = &key_paths {
1311 let key_vals: Vec<Value> = paths
1312 .iter()
1313 .map(|segs| value_get_by_segments(row, segs).unwrap_or(Value::Null))
1314 .collect();
1315 serde_json::to_string(&key_vals).unwrap_or_else(|_| "[]".to_string())
1316 } else {
1317 serde_json::to_string(row).unwrap_or_else(|_| "null".to_string())
1318 };
1319
1320 if seen.insert(sig) {
1321 out.push(row.clone());
1322 }
1323 }
1324
1325 Ok(Value::Array(out))
1326}
1327
1328fn op_data_sort_by(args: &[Value], ctx: &mut Context) -> EvalResult {
1329 let rows = evaluate(require_arg(args, 0, "data.sort_by")?, ctx)?;
1330 let spec = evaluate(require_arg(args, 1, "data.sort_by")?, ctx)?;
1331 let arr = rows
1332 .as_array()
1333 .ok_or_else(|| EvalError::new("data.sort_by first arg must be array"))?;
1334 let spec_arr = spec
1335 .as_array()
1336 .ok_or_else(|| EvalError::new("data.sort_by second arg must be array"))?;
1337
1338 let mut order_specs: Vec<(Vec<PathSegment>, bool)> = Vec::new();
1339 for s in spec_arr {
1340 match s {
1341 Value::String(path) => {
1342 if let Some(segs) = parse_path(path) {
1343 order_specs.push((segs, true));
1344 }
1345 }
1346 Value::Object(obj) => {
1347 let Some(col) = obj.get("col").and_then(Value::as_str) else {
1348 continue;
1349 };
1350 let asc = !obj
1351 .get("dir")
1352 .and_then(Value::as_str)
1353 .map(|d| d.eq_ignore_ascii_case("desc"))
1354 .unwrap_or(false);
1355 if let Some(segs) = parse_path(col) {
1356 order_specs.push((segs, asc));
1357 }
1358 }
1359 _ => {}
1360 }
1361 }
1362
1363 let mut out = arr.clone();
1364 out.sort_by(|a, b| {
1365 for (segs, asc) in &order_specs {
1366 let av = value_get_by_segments(a, segs).unwrap_or(Value::Null);
1367 let bv = value_get_by_segments(b, segs).unwrap_or(Value::Null);
1368 let ord = cmp_numbers_or_strings(&av, &bv).unwrap_or(std::cmp::Ordering::Equal);
1369 if ord != std::cmp::Ordering::Equal {
1370 return if *asc { ord } else { ord.reverse() };
1371 }
1372 }
1373 std::cmp::Ordering::Equal
1374 });
1375
1376 Ok(Value::Array(out))
1377}
1378
1379fn op_data_group_by(args: &[Value], ctx: &mut Context) -> EvalResult {
1380 let rows = evaluate(require_arg(args, 0, "data.group_by")?, ctx)?;
1381 let keys = evaluate(require_arg(args, 1, "data.group_by")?, ctx)?;
1382 let arr = rows
1383 .as_array()
1384 .ok_or_else(|| EvalError::new("data.group_by first arg must be array"))?;
1385 let key_list = keys
1386 .as_array()
1387 .ok_or_else(|| EvalError::new("data.group_by second arg must be array"))?;
1388
1389 let mut parsed_keys: Vec<(String, Vec<PathSegment>)> = Vec::new();
1390 for k in key_list {
1391 let Some(path) = k.as_str() else {
1392 continue;
1393 };
1394 if let Some(segs) = parse_path(path) {
1395 parsed_keys.push((path.to_string(), segs));
1396 }
1397 }
1398
1399 let mut order: Vec<(Map<String, Value>, Vec<Value>)> = Vec::new();
1400 let mut index: HashMap<String, usize> = HashMap::new();
1401 for row in arr {
1402 let mut key_obj = Map::new();
1403 let mut key_vals = Vec::new();
1404 for (raw, segs) in &parsed_keys {
1405 let v = value_get_by_segments(row, segs).unwrap_or(Value::Null);
1406 key_vals.push(v.clone());
1407 key_obj.insert(raw.clone(), v);
1408 }
1409 let sig = serde_json::to_string(&key_vals).unwrap_or_else(|_| "[]".to_string());
1410 if let Some(i) = index.get(&sig).copied() {
1411 order[i].1.push(row.clone());
1412 } else {
1413 index.insert(sig, order.len());
1414 order.push((key_obj, vec![row.clone()]));
1415 }
1416 }
1417
1418 let mut out = Vec::with_capacity(order.len());
1419 for (keys_obj, grouped_rows) in order {
1420 let mut obj = keys_obj;
1421 obj.insert("_rows".to_string(), Value::Array(grouped_rows));
1422 out.push(Value::Object(obj));
1423 }
1424 Ok(Value::Array(out))
1425}
1426
1427fn op_data_aggregate(args: &[Value], ctx: &mut Context) -> EvalResult {
1428 let groups = evaluate(require_arg(args, 0, "data.aggregate")?, ctx)?;
1429 let spec = evaluate(require_arg(args, 1, "data.aggregate")?, ctx)?;
1430 let groups_arr = groups
1431 .as_array()
1432 .ok_or_else(|| EvalError::new("data.aggregate first arg must be array"))?;
1433 let spec_obj = spec
1434 .as_object()
1435 .ok_or_else(|| EvalError::new("data.aggregate second arg must be object"))?;
1436
1437 let mut parsed_spec: Vec<(String, AggExpr)> = Vec::new();
1438 for (out_key, expr_val) in spec_obj {
1439 let Some(expr) = expr_val.as_str() else {
1440 continue;
1441 };
1442 if let Some(parsed) = parse_agg_expr(expr) {
1443 parsed_spec.push((out_key.clone(), parsed));
1444 }
1445 }
1446
1447 let mut out = Vec::with_capacity(groups_arr.len());
1448 for group in groups_arr {
1449 let Some(group_obj) = group.as_object() else {
1450 continue;
1451 };
1452 let rows = group_obj
1453 .get("_rows")
1454 .and_then(Value::as_array)
1455 .cloned()
1456 .unwrap_or_default();
1457
1458 let mut next = Map::new();
1459 for (k, v) in group_obj {
1460 if k != "_rows" {
1461 next.insert(k.clone(), v.clone());
1462 }
1463 }
1464
1465 for (out_key, expr) in &parsed_spec {
1466 let value = match expr {
1467 AggExpr::CountAll => json!(rows.len()),
1468 AggExpr::CountField(path) => {
1469 let n = rows
1470 .iter()
1471 .filter(|r| value_get_by_segments(r, path).is_some())
1472 .count();
1473 json!(n)
1474 }
1475 AggExpr::Sum(path) => {
1476 let mut sum = 0.0f64;
1477 for r in &rows {
1478 if let Some(v) = value_get_by_segments(r, path) {
1479 sum += as_f64(&v).unwrap_or(0.0);
1480 }
1481 }
1482 json!(sum)
1483 }
1484 AggExpr::Avg(path) => {
1485 let mut sum = 0.0f64;
1486 let mut n = 0usize;
1487 for r in &rows {
1488 if let Some(v) = value_get_by_segments(r, path) {
1489 sum += as_f64(&v).unwrap_or(0.0);
1490 n += 1;
1491 }
1492 }
1493 json!(if n == 0 { 0.0 } else { sum / n as f64 })
1494 }
1495 AggExpr::Min(path) => {
1496 let mut best: Option<Value> = None;
1497 for r in &rows {
1498 if let Some(v) = value_get_by_segments(r, path) {
1499 if let Some(cur) = &best {
1500 if cmp_numbers_or_strings(&v, cur).is_some_and(|ord| ord.is_lt()) {
1501 best = Some(v);
1502 }
1503 } else {
1504 best = Some(v);
1505 }
1506 }
1507 }
1508 best.unwrap_or(Value::Null)
1509 }
1510 AggExpr::Max(path) => {
1511 let mut best: Option<Value> = None;
1512 for r in &rows {
1513 if let Some(v) = value_get_by_segments(r, path) {
1514 if let Some(cur) = &best {
1515 if cmp_numbers_or_strings(&v, cur).is_some_and(|ord| ord.is_gt()) {
1516 best = Some(v);
1517 }
1518 } else {
1519 best = Some(v);
1520 }
1521 }
1522 }
1523 best.unwrap_or(Value::Null)
1524 }
1525 };
1526 next.insert(out_key.clone(), value);
1527 }
1528
1529 out.push(Value::Object(next));
1530 }
1531
1532 Ok(Value::Array(out))
1533}
1534
1535fn op_data_join(args: &[Value], ctx: &mut Context) -> EvalResult {
1536 let left = evaluate(require_arg(args, 0, "data.join")?, ctx)?;
1537 let right = evaluate(require_arg(args, 1, "data.join")?, ctx)?;
1538 let options = evaluate(require_arg(args, 2, "data.join")?, ctx)?;
1539 let left_arr = left
1540 .as_array()
1541 .ok_or_else(|| EvalError::new("data.join first arg must be array"))?;
1542 let right_arr = right
1543 .as_array()
1544 .ok_or_else(|| EvalError::new("data.join second arg must be array"))?;
1545 let options = options
1546 .as_object()
1547 .ok_or_else(|| EvalError::new("data.join third arg must be object"))?;
1548
1549 let on_fields = options
1550 .get("on")
1551 .and_then(Value::as_array)
1552 .ok_or_else(|| EvalError::new("data.join requires options.on array"))?;
1553 let join_type = options
1554 .get("type")
1555 .and_then(Value::as_str)
1556 .unwrap_or("inner")
1557 .to_ascii_lowercase();
1558 let is_left_join = join_type == "left";
1559
1560 let mut parsed_on = Vec::new();
1561 for f in on_fields {
1562 let Some(path) = f.as_str() else {
1563 continue;
1564 };
1565 if let Some(segs) = parse_path(path) {
1566 parsed_on.push((path.to_string(), segs));
1567 }
1568 }
1569
1570 let mut right_index: HashMap<String, Vec<Value>> = HashMap::new();
1571 for row in right_arr {
1572 let key_vals: Vec<Value> = parsed_on
1573 .iter()
1574 .map(|(_, segs)| value_get_by_segments(row, segs).unwrap_or(Value::Null))
1575 .collect();
1576 let sig = serde_json::to_string(&key_vals).unwrap_or_else(|_| "[]".to_string());
1577 right_index.entry(sig).or_default().push(row.clone());
1578 }
1579
1580 let mut out = Vec::new();
1581 for l in left_arr {
1582 let key_vals: Vec<Value> = parsed_on
1583 .iter()
1584 .map(|(_, segs)| value_get_by_segments(l, segs).unwrap_or(Value::Null))
1585 .collect();
1586 let sig = serde_json::to_string(&key_vals).unwrap_or_else(|_| "[]".to_string());
1587 let matched = right_index.get(&sig);
1588
1589 if let Some(rs) = matched {
1590 for r in rs {
1591 let mut merged = l.clone();
1592 if let (Some(lo), Some(ro)) = (merged.as_object_mut(), r.as_object()) {
1593 for (k, v) in ro {
1594 if lo.contains_key(k) {
1595 lo.insert(format!("right_{k}"), v.clone());
1596 } else {
1597 lo.insert(k.clone(), v.clone());
1598 }
1599 }
1600 }
1601 out.push(merged);
1602 }
1603 } else if is_left_join {
1604 out.push(l.clone());
1605 }
1606 }
1607
1608 Ok(Value::Array(out))
1609}
1610
1611fn op_log(args: &[Value], ctx: &mut Context) -> EvalResult {
1612 let mut out = Vec::with_capacity(args.len());
1613 for arg in args {
1614 out.push(evaluate(arg, ctx)?);
1615 }
1616 eprintln!("[josie.log] {}", Value::Array(out.clone()));
1617 Ok(Value::Array(out))
1618}
1619
1620fn op_len(args: &[Value], ctx: &mut Context) -> EvalResult {
1621 let v = evaluate(require_arg(args, 0, "len")?, ctx)?;
1622 let n = match v {
1623 Value::String(s) => s.chars().count(),
1624 Value::Array(a) => a.len(),
1625 Value::Object(o) => o.len(),
1626 _ => 0,
1627 };
1628 Ok(json!(n))
1629}
1630
1631fn op_push(args: &[Value], ctx: &mut Context) -> EvalResult {
1632 let list = evaluate(require_arg(args, 0, "push")?, ctx)?;
1633 let item = evaluate(require_arg(args, 1, "push")?, ctx)?;
1634 let mut arr = match list {
1635 Value::Array(a) => a,
1636 _ => Vec::new(),
1637 };
1638 arr.push(item);
1639 Ok(Value::Array(arr))
1640}
1641
1642fn op_get(args: &[Value], ctx: &mut Context) -> EvalResult {
1643 let collection = evaluate(require_arg(args, 0, "get")?, ctx)?;
1644 let key = evaluate(require_arg(args, 1, "get")?, ctx)?;
1645 let out = match (collection, key) {
1646 (Value::Object(obj), Value::String(k)) => obj.get(&k).cloned(),
1647 (Value::Array(arr), Value::Number(n)) => {
1648 n.as_u64().and_then(|i| arr.get(i as usize).cloned())
1649 }
1650 (Value::Array(arr), Value::String(s)) => {
1651 s.parse::<usize>().ok().and_then(|i| arr.get(i).cloned())
1652 }
1653 _ => None,
1654 };
1655 Ok(out.unwrap_or(Value::Null))
1656}
1657
1658fn op_do(args: &[Value], ctx: &mut Context) -> EvalResult {
1659 let mut last = Value::Null;
1660 for arg in args {
1661 last = evaluate(arg, ctx)?;
1662 }
1663 Ok(last)
1664}
1665
1666fn op_pipe(args: &[Value], ctx: &mut Context) -> EvalResult {
1667 let prev_pipe = ctx.state.client.get("pipe").cloned();
1668 let out = (|| {
1669 let mut last = Value::Null;
1670 for step in args {
1671 let mut pipe_obj = Map::new();
1672 pipe_obj.insert("prev".to_string(), last.clone());
1673 ctx.state
1674 .client
1675 .insert("pipe".to_string(), Value::Object(pipe_obj));
1676 last = evaluate(step, ctx)?;
1677 }
1678 Ok(last)
1679 })();
1680
1681 if let Some(old) = prev_pipe {
1682 ctx.state.client.insert("pipe".to_string(), old);
1683 } else {
1684 ctx.state.client.remove("pipe");
1685 }
1686 out
1687}
1688
1689fn has_side_effects(node: &Value) -> bool {
1690 match node {
1691 Value::Array(arr) => {
1692 if let Some(op) = arr.first().and_then(Value::as_str) {
1693 match op {
1694 "set" | "def" | "effect" | "log" | "push" | "w.event.prevent" => {
1695 return true;
1696 }
1697 _ => {}
1698 }
1699 }
1700 arr.iter().any(has_side_effects)
1701 }
1702 Value::Object(obj) => obj.values().any(has_side_effects),
1703 _ => false,
1704 }
1705}
1706
1707fn memo_key_from_args(args: &[Value]) -> Option<String> {
1708 let cacheable = args.iter().all(|v| {
1709 matches!(
1710 v,
1711 Value::Null | Value::Bool(_) | Value::Number(_) | Value::String(_)
1712 )
1713 });
1714 if !cacheable {
1715 return None;
1716 }
1717 serde_json::to_string(args).ok()
1718}
1719
1720fn op_def(args: &[Value], ctx: &mut Context) -> EvalResult {
1721 let name = require_arg(args, 0, "def")?
1722 .as_str()
1723 .ok_or_else(|| EvalError::new("def name must be string"))?
1724 .to_string();
1725 let params = args
1726 .get(1)
1727 .and_then(Value::as_array)
1728 .map(|arr| {
1729 arr.iter()
1730 .filter_map(|v| v.as_str().map(ToString::to_string))
1731 .collect::<Vec<_>>()
1732 })
1733 .unwrap_or_default();
1734 let body = require_arg(args, 2, "def")?.clone();
1735 let memoizable = !has_side_effects(&body);
1736 ctx.state.fns.insert(
1737 name.clone(),
1738 FunctionDef {
1739 params,
1740 body,
1741 memoizable,
1742 },
1743 );
1744 ctx.state.call_memo.remove(&name);
1746 Ok(Value::Null)
1747}
1748
1749fn op_call(args: &[Value], ctx: &mut Context) -> EvalResult {
1765 let name = if let Some(s) = require_arg(args, 0, "call")?.as_str() {
1766 s.to_string()
1767 } else {
1768 let name_val = evaluate(require_arg(args, 0, "call")?, ctx)?;
1769 name_val
1770 .as_str()
1771 .ok_or_else(|| EvalError::new("call function name must be string"))?
1772 .to_string()
1773 };
1774
1775 if let Some(def) = ctx.state.fns.get(&name).cloned() {
1776 let mut evaluated_args = Vec::with_capacity(args.len().saturating_sub(1));
1778 for arg in &args[1..] {
1779 evaluated_args.push(evaluate(arg, ctx)?);
1780 }
1781
1782 let memo_key = if def.memoizable {
1783 memo_key_from_args(&evaluated_args)
1784 } else {
1785 None
1786 };
1787 if let Some(key) = memo_key.as_ref() {
1788 if let Some(v) = ctx
1789 .state
1790 .call_memo
1791 .get(&name)
1792 .and_then(|entry| entry.get(key))
1793 .cloned()
1794 {
1795 return Ok(v);
1796 }
1797 }
1798
1799 let mut frame: HashMap<String, Value> = HashMap::with_capacity(def.params.len());
1801 for (idx, param) in def.params.iter().enumerate() {
1802 let new_val = evaluated_args.get(idx).cloned().unwrap_or(Value::Null);
1803 frame.insert(param.clone(), new_val);
1804 }
1805 ctx.state.call_locals.push(frame);
1806
1807 let result = evaluate(&def.body, ctx);
1808
1809 let _ = ctx.state.call_locals.pop();
1811
1812 if let (Some(key), Ok(v)) = (memo_key, &result) {
1813 ctx.state
1814 .call_memo
1815 .entry(name.clone())
1816 .or_default()
1817 .insert(key, v.clone());
1818 }
1819
1820 return result;
1821 }
1822
1823 if let Some(op) = ctx.operators.get(&name) {
1824 return op(&args[1..], ctx);
1825 }
1826
1827 Err(EvalError::new(format!("function '{name}' not found")))
1828}
1829
1830fn with_item_scope<F>(ctx: &mut Context, item: Value, index: usize, f: F) -> EvalResult
1831where
1832 F: FnOnce(&mut Context) -> EvalResult,
1833{
1834 let old_item = ctx.state.client.insert("item".to_string(), item);
1835 let old_index = ctx.state.client.insert("index".to_string(), json!(index));
1836 let out = f(ctx);
1837 if let Some(prev) = old_item {
1838 ctx.state.client.insert("item".to_string(), prev);
1839 } else {
1840 ctx.state.client.remove("item");
1841 }
1842 if let Some(prev) = old_index {
1843 ctx.state.client.insert("index".to_string(), prev);
1844 } else {
1845 ctx.state.client.remove("index");
1846 }
1847 out
1848}
1849
1850fn op_map(args: &[Value], ctx: &mut Context) -> EvalResult {
1851 let list = evaluate(require_arg(args, 0, "map")?, ctx)?;
1852 let expr = require_arg(args, 1, "map")?.clone();
1853 let arr = list
1854 .as_array()
1855 .cloned()
1856 .ok_or_else(|| EvalError::new("map first arg must evaluate to array"))?;
1857 let mut out = Vec::with_capacity(arr.len());
1858 for (i, item) in arr.into_iter().enumerate() {
1859 out.push(with_item_scope(ctx, item, i, |inner| {
1860 evaluate(&expr, inner)
1861 })?);
1862 }
1863 Ok(Value::Array(out))
1864}
1865
1866fn op_filter(args: &[Value], ctx: &mut Context) -> EvalResult {
1867 let list = evaluate(require_arg(args, 0, "filter")?, ctx)?;
1868 let expr = require_arg(args, 1, "filter")?.clone();
1869 let arr = list
1870 .as_array()
1871 .cloned()
1872 .ok_or_else(|| EvalError::new("filter first arg must evaluate to array"))?;
1873 let mut out = Vec::new();
1874 for (i, item) in arr.into_iter().enumerate() {
1875 let keep = with_item_scope(ctx, item.clone(), i, |inner| evaluate(&expr, inner))?;
1876 if truthy(&keep) {
1877 out.push(item);
1878 }
1879 }
1880 Ok(Value::Array(out))
1881}
1882
1883fn op_match(args: &[Value], ctx: &mut Context) -> EvalResult {
1884 let value = evaluate(require_arg(args, 0, "match")?, ctx)?;
1885 let mut i = 1usize;
1886 while i + 1 < args.len() {
1887 let pat = evaluate(&args[i], ctx)?;
1888 if pat == Value::String("_".to_string()) || pat == value {
1889 return evaluate(&args[i + 1], ctx);
1890 }
1891 i += 2;
1892 }
1893 Ok(Value::Null)
1894}
1895
1896fn op_event_value(_args: &[Value], ctx: &mut Context) -> EvalResult {
1897 Ok(ctx.event.map(|e| e.value.clone()).unwrap_or(Value::Null))
1898}
1899
1900fn op_event_key(_args: &[Value], ctx: &mut Context) -> EvalResult {
1901 Ok(ctx.event.map(|e| e.key.clone()).unwrap_or(Value::Null))
1902}
1903
1904fn op_event_prevent(_args: &[Value], ctx: &mut Context) -> EvalResult {
1905 Ok(Value::Bool(ctx.event.map(|e| e.prevent).unwrap_or(false)))
1906}
1907
1908fn op_effect(args: &[Value], ctx: &mut Context) -> EvalResult {
1909 let mut last = Value::Null;
1910 for arg in args {
1911 last = evaluate(arg, ctx)?;
1912 }
1913 Ok(last)
1914}
1915
1916#[cfg(test)]
1917mod tests {
1918 use super::*;
1919 use serde_json::json;
1920
1921 fn eval_expr(expr: Value) -> Value {
1922 let mut state = State::new();
1923 let operators = Operators::new();
1924 let mut ctx = Context {
1925 state: &mut state,
1926 operators: &operators,
1927 event: None,
1928 };
1929 evaluate(&expr, &mut ctx).expect("evaluate")
1930 }
1931
1932 #[test]
1933 fn util_path_ops_work() {
1934 let payload = json!({
1935 "headers": {
1936 "Authorization": "Bearer A"
1937 },
1938 "user": {
1939 "id": 7,
1940 "profile": {
1941 "email": "x@example.com"
1942 }
1943 },
1944 "items": [{"sku":"a"}, {"sku":"b"}]
1945 });
1946
1947 let v = eval_expr(json!([
1948 "util.get_path",
1949 payload.clone(),
1950 "user.profile.email",
1951 null
1952 ]));
1953 assert_eq!(v, json!("x@example.com"));
1954
1955 let v = eval_expr(json!([
1956 "util.one_of",
1957 payload.clone(),
1958 [
1959 "headers.authorization",
1960 "headers.Authorization",
1961 "header.auth"
1962 ],
1963 null
1964 ]));
1965 assert_eq!(v, json!("Bearer A"));
1966
1967 let v = eval_expr(json!([
1968 "util.pick",
1969 payload,
1970 ["user.id", "user.profile.email", "items[1].sku"]
1971 ]));
1972 assert_eq!(
1973 v,
1974 json!({
1975 "user": {
1976 "id": 7,
1977 "profile": { "email": "x@example.com" }
1978 },
1979 "items": [null, {"sku":"b"}]
1980 })
1981 );
1982
1983 let v = eval_expr(json!(["util.exists", json!({"a":{"b":[1,2,3]}}), "a.b[1]"]));
1984 assert_eq!(v, json!(true));
1985
1986 let v = eval_expr(json!([
1987 "util.omit",
1988 json!({"a":1,"b":{"c":2},"arr":[1,2,3]}),
1989 ["b.c", "arr[1]"]
1990 ]));
1991 assert_eq!(v, json!({"a":1,"b":{},"arr":[1,null,3]}));
1992 }
1993
1994 #[test]
1995 fn util_replace_ops_work() {
1996 let v = eval_expr(json!(["util.replace", "a-b-c", "-", "_"]));
1997 assert_eq!(v, json!("a_b-c"));
1998
1999 let v = eval_expr(json!(["util.replace_all", "a-b-c", "-", "_"]));
2000 assert_eq!(v, json!("a_b_c"));
2001
2002 let v = eval_expr(json!(["util.as_array", 9]));
2003 assert_eq!(v, json!([9]));
2004
2005 let v = eval_expr(json!(["util.to_bool", "YES"]));
2006 assert_eq!(v, json!(true));
2007
2008 let v = eval_expr(json!(["util.split", "a,b,c", ","]));
2009 assert_eq!(v, json!(["a", "b", "c"]));
2010
2011 let v = eval_expr(json!(["util.join", ["a", 2, true], "-"]));
2012 assert_eq!(v, json!("a-2-true"));
2013 }
2014
2015 #[test]
2016 fn util_regex_ops_work() {
2017 let v = eval_expr(json!(["util.regex_match", "HELLO", "^hello$", {"case_insensitive": true}]));
2018 assert_eq!(v, json!(true));
2019
2020 let v = eval_expr(json!(["util.regex_find", "order-123-x", "\\d+"]));
2021 assert_eq!(v, json!("123"));
2022
2023 let v = eval_expr(json!(["util.regex_find_all", "a1b22c333", "\\d+"]));
2024 assert_eq!(v, json!(["1", "22", "333"]));
2025
2026 let v = eval_expr(json!(["util.regex_replace", "a1b2", "\\d+", "X"]));
2027 assert_eq!(v, json!("aXbX"));
2028
2029 let v = eval_expr(json!(["util.regex_replace", "a1b2", "\\d+", "X", {"all": false}]));
2030 assert_eq!(v, json!("aXb2"));
2031 }
2032
2033 #[test]
2034 fn data_ops_work() {
2035 let rows = json!([
2036 {"id":"1","amount":"12.5","name":"ALICE","extra":{"k":"x"}},
2037 {"id":"2","amount":"7","name":"BOB","extra":{"k":"y"}}
2038 ]);
2039
2040 let v = eval_expr(json!([
2041 "data.select",
2042 rows.clone(),
2043 ["id", "extra.k", "missing"]
2044 ]));
2045 assert_eq!(
2046 v,
2047 json!([
2048 {"id":"1","extra":{"k":"x"},"missing":null},
2049 {"id":"2","extra":{"k":"y"},"missing":null}
2050 ])
2051 );
2052
2053 let v =
2054 eval_expr(json!(["data.rename", rows.clone(), {"name":"full_name", "extra.k":"kind"}]));
2055 assert_eq!(
2056 v,
2057 json!([
2058 {"id":"1","amount":"12.5","full_name":"ALICE","extra":{},"kind":"x"},
2059 {"id":"2","amount":"7","full_name":"BOB","extra":{},"kind":"y"}
2060 ])
2061 );
2062
2063 let v = eval_expr(json!(["data.cast", rows, {"id":"int", "amount":"float"}]));
2064 assert_eq!(
2065 v,
2066 json!([
2067 {"id":1,"amount":12.5,"name":"ALICE","extra":{"k":"x"}},
2068 {"id":2,"amount":7.0,"name":"BOB","extra":{"k":"y"}}
2069 ])
2070 );
2071
2072 let v = eval_expr(json!(["data.chunk", [1, 2, 3, 4, 5], 2]));
2073 assert_eq!(v, json!([[1, 2], [3, 4], [5]]));
2074
2075 let v = eval_expr(json!([
2076 "data.flat_map",
2077 [
2078 {"id":1,"items":[{"sku":"a"},{"sku":"b"}]},
2079 {"id":2,"items":[{"sku":"c"}]}
2080 ],
2081 "items"
2082 ]));
2083 assert_eq!(
2084 v,
2085 json!([
2086 {"id":1,"items":{"sku":"a"}},
2087 {"id":1,"items":{"sku":"b"}},
2088 {"id":2,"items":{"sku":"c"}}
2089 ])
2090 );
2091
2092 let v = eval_expr(json!([
2093 "data.distinct",
2094 [
2095 {"id":1,"email":"a@x"},
2096 {"id":2,"email":"a@x"},
2097 {"id":3,"email":"b@x"}
2098 ],
2099 ["email"]
2100 ]));
2101 assert_eq!(
2102 v,
2103 json!([
2104 {"id":1,"email":"a@x"},
2105 {"id":3,"email":"b@x"}
2106 ])
2107 );
2108
2109 let v = eval_expr(json!([
2110 "data.sort_by",
2111 [
2112 {"id":1,"score":9},
2113 {"id":2,"score":3},
2114 {"id":3,"score":7}
2115 ],
2116 [{"col":"score","dir":"desc"}]
2117 ]));
2118 assert_eq!(
2119 v,
2120 json!([
2121 {"id":1,"score":9},
2122 {"id":3,"score":7},
2123 {"id":2,"score":3}
2124 ])
2125 );
2126
2127 let grouped = eval_expr(json!([
2128 "data.group_by",
2129 [
2130 {"user":"u1","amount":10},
2131 {"user":"u1","amount":5},
2132 {"user":"u2","amount":3}
2133 ],
2134 ["user"]
2135 ]));
2136 let v = eval_expr(json!([
2137 "data.aggregate",
2138 grouped,
2139 {"n":"count(*)","total":"sum(amount)"}
2140 ]));
2141 assert_eq!(
2142 v,
2143 json!([
2144 {"user":"u1","n":2,"total":15.0},
2145 {"user":"u2","n":1,"total":3.0}
2146 ])
2147 );
2148
2149 let v = eval_expr(json!([
2150 "data.join",
2151 [
2152 {"user":"u1","l":1},
2153 {"user":"u2","l":2}
2154 ],
2155 [
2156 {"user":"u1","r":"A"}
2157 ],
2158 {"on":["user"],"type":"left"}
2159 ]));
2160 assert_eq!(
2161 v,
2162 json!([
2163 {"user":"u1","l":1,"right_user":"u1","r":"A"},
2164 {"user":"u2","l":2}
2165 ])
2166 );
2167 }
2168
2169 #[test]
2170 fn pipe_restores_scope_on_error_without_previous_pipe() {
2171 let mut state = State::new();
2172 state.client.insert("stable".to_string(), json!("ok"));
2173 let operators = Operators::new();
2174 let mut ctx = Context {
2175 state: &mut state,
2176 operators: &operators,
2177 event: None,
2178 };
2179 let out = evaluate(
2180 &json!(["pipe", ["set", "x", 1], ["call", "missing_function"]]),
2181 &mut ctx,
2182 );
2183 assert!(out.is_err());
2184 assert!(ctx.state.client.get("pipe").is_none());
2185 assert_eq!(ctx.state.client.get("stable"), Some(&json!("ok")));
2186 }
2187
2188 #[test]
2189 fn pipe_restores_previous_pipe_on_error() {
2190 let mut state = State::new();
2191 let prev = json!({"prev":"seed"});
2192 state.client.insert("pipe".to_string(), prev.clone());
2193 let operators = Operators::new();
2194 let mut ctx = Context {
2195 state: &mut state,
2196 operators: &operators,
2197 event: None,
2198 };
2199 let out = evaluate(&json!(["pipe", ["call", "missing_function"]]), &mut ctx);
2200 assert!(out.is_err());
2201 assert_eq!(ctx.state.client.get("pipe"), Some(&prev));
2202 }
2203}