1use serde_json::{json, Map, Value as JsonValue};
2
3pub fn echo(args: &JsonValue) -> JsonValue {
4 let message = args
5 .get("message")
6 .and_then(|v| v.as_str())
7 .unwrap_or("")
8 .to_string();
9 json!({ "message": message })
10}
11
12pub fn tap(args: &JsonValue) -> JsonValue {
13 args.get("__input").cloned().unwrap_or_else(|| {
14 let message = args
15 .get("message")
16 .and_then(|v| v.as_str())
17 .unwrap_or("")
18 .to_string();
19 serde_json::json!({ "message": message })
20 })
21}
22
23pub fn pack_state_data(args: &JsonValue) -> JsonValue {
24 let state = args.get("state").cloned().unwrap_or(JsonValue::Null);
25 let data = args
26 .get("data")
27 .cloned()
28 .or_else(|| args.get("__input").cloned())
29 .unwrap_or(JsonValue::Null);
30
31 serde_json::json!({
32 "state": state,
33 "data": data,
34 })
35}
36
37pub fn get_state(args: &JsonValue) -> JsonValue {
38 args.get("input")
39 .or_else(|| args.get("__input"))
40 .and_then(|v| v.get("state"))
41 .cloned()
42 .unwrap_or(JsonValue::Null)
43}
44
45pub fn get_data(args: &JsonValue) -> JsonValue {
46 args.get("input")
47 .or_else(|| args.get("__input"))
48 .and_then(|v| v.get("data"))
49 .cloned()
50 .unwrap_or(JsonValue::Null)
51}
52
53pub fn apply_lane(args: &JsonValue) -> JsonValue {
54 let lane = args.get("lane").and_then(|v| v.as_str()).unwrap_or("");
55 let fields = args
56 .get("fields")
57 .and_then(|v| v.as_object())
58 .cloned()
59 .unwrap_or_default();
60
61 let input = args
62 .get("input")
63 .cloned()
64 .or_else(|| args.get("__input").cloned())
65 .unwrap_or(JsonValue::Null);
66
67 let mut out = match input {
68 JsonValue::Object(map) => map,
69 other => {
70 let mut map = Map::new();
71 map.insert("data".to_string(), other);
72 map
73 }
74 };
75
76 let mut lane_obj = out
77 .get(lane)
78 .and_then(|v| v.as_object())
79 .cloned()
80 .unwrap_or_default();
81
82 for (k, v) in fields {
83 lane_obj.insert(k, v);
84 }
85
86 out.insert(lane.to_string(), JsonValue::Object(lane_obj));
87 JsonValue::Object(out)
88}
89
90pub fn map(args: &JsonValue) -> JsonValue {
91 let items = core_items(args);
92 let Some(field) = args.get("field").and_then(|v| v.as_str()) else {
93 return JsonValue::Array(items);
94 };
95
96 JsonValue::Array(
97 items
98 .into_iter()
99 .map(|item| item.get(field).cloned().unwrap_or(JsonValue::Null))
100 .collect(),
101 )
102}
103
104pub fn filter(args: &JsonValue) -> JsonValue {
105 let items = core_items(args);
106 let Some(field) = args.get("field").and_then(|v| v.as_str()) else {
107 return JsonValue::Array(items);
108 };
109 let equals = args.get("equals").cloned().unwrap_or(JsonValue::Null);
110
111 JsonValue::Array(
112 items
113 .into_iter()
114 .filter(|item| item.get(field).map(|v| v == &equals).unwrap_or(false))
115 .collect(),
116 )
117}
118
119pub fn find(args: &JsonValue) -> JsonValue {
120 let items = core_items(args);
121 let Some(field) = args.get("field").and_then(|v| v.as_str()) else {
122 return JsonValue::Null;
123 };
124 let equals = args.get("equals").cloned().unwrap_or(JsonValue::Null);
125
126 items
127 .into_iter()
128 .find(|item| item.get(field).map(|v| v == &equals).unwrap_or(false))
129 .unwrap_or(JsonValue::Null)
130}
131
132pub fn reduce(args: &JsonValue) -> JsonValue {
133 let items = core_items(args);
134 let mode = args.get("mode").and_then(|v| v.as_str()).unwrap_or("last");
135
136 match mode {
137 "sum" => {
138 let sum = items
139 .iter()
140 .filter_map(|v| v.as_f64())
141 .fold(0.0, |acc, n| acc + n);
142 json!(sum)
143 }
144 "min" => items
145 .iter()
146 .filter_map(|v| v.as_f64())
147 .reduce(f64::min)
148 .map(|v| json!(v))
149 .unwrap_or(JsonValue::Null),
150 "max" => items
151 .iter()
152 .filter_map(|v| v.as_f64())
153 .reduce(f64::max)
154 .map(|v| json!(v))
155 .unwrap_or(JsonValue::Null),
156 "avg" => {
157 let nums = items.iter().filter_map(|v| v.as_f64()).collect::<Vec<_>>();
158 if nums.is_empty() {
159 JsonValue::Null
160 } else {
161 json!(nums.iter().sum::<f64>() / nums.len() as f64)
162 }
163 }
164 "concat" => {
165 let initial = args
166 .get("initial")
167 .and_then(|v| v.as_str())
168 .unwrap_or("");
169 let joined = items
170 .iter()
171 .map(core_scalar_to_key)
172 .collect::<Vec<_>>()
173 .join("");
174 json!(format!("{initial}{joined}"))
175 }
176 "count" => json!(items.len()),
177 "first" => items.first().cloned().unwrap_or(JsonValue::Null),
178 "last" => items.last().cloned().unwrap_or(JsonValue::Null),
179 _ => json!({
180 "error": format!("unsupported reduce mode '{mode}'"),
181 "supported_modes": ["sum", "min", "max", "avg", "concat", "count", "first", "last"]
182 }),
183 }
184}
185
186pub fn group_by(args: &JsonValue) -> JsonValue {
187 let items = core_items(args);
188 let Some(field) = args.get("field").and_then(|v| v.as_str()) else {
189 return JsonValue::Object(Map::new());
190 };
191
192 let mut grouped = Map::new();
193 for item in items {
194 let key = item
195 .get(field)
196 .map(core_scalar_to_key)
197 .unwrap_or_else(|| "null".to_string());
198
199 let entry = grouped
200 .entry(key)
201 .or_insert_with(|| JsonValue::Array(Vec::new()));
202 if let Some(values) = entry.as_array_mut() {
203 values.push(item);
204 }
205 }
206
207 JsonValue::Object(grouped)
208}
209
210pub fn merge(args: &JsonValue) -> JsonValue {
211 let left = args
212 .get("left")
213 .and_then(|v| v.as_object())
214 .cloned()
215 .or_else(|| args.get("__input").and_then(|v| v.as_object()).cloned())
216 .unwrap_or_default();
217
218 let right = args
219 .get("right")
220 .and_then(|v| v.as_object())
221 .cloned()
222 .unwrap_or_default();
223
224 let mut merged = left;
225 for (k, v) in right {
226 merged.insert(k, v);
227 }
228
229 JsonValue::Object(merged)
230}
231
232pub fn pick(args: &JsonValue) -> JsonValue {
233 let input = args
234 .get("input")
235 .and_then(|v| v.as_object())
236 .cloned()
237 .or_else(|| args.get("__input").and_then(|v| v.as_object()).cloned())
238 .unwrap_or_default();
239
240 let fields = args
241 .get("fields")
242 .and_then(|v| v.as_array())
243 .cloned()
244 .unwrap_or_default();
245
246 let mut picked = Map::new();
247 for field in fields.iter().filter_map(|v| v.as_str()) {
248 if let Some(value) = input.get(field) {
249 picked.insert(field.to_string(), value.clone());
250 }
251 }
252 JsonValue::Object(picked)
253}
254
255pub fn validate_schema(args: &JsonValue) -> JsonValue {
256 let required = args
257 .get("required")
258 .and_then(|v| v.as_array())
259 .cloned()
260 .unwrap_or_default();
261 let data = args
262 .get("data")
263 .and_then(|v| v.as_object())
264 .cloned()
265 .unwrap_or_default();
266
267 let missing = required
268 .iter()
269 .filter_map(|v| v.as_str())
270 .filter(|name| !data.contains_key(*name))
271 .map(|s| JsonValue::String(s.to_string()))
272 .collect::<Vec<_>>();
273
274 json!({
275 "ok": missing.is_empty(),
276 "missing": missing,
277 })
278}
279
280pub fn add(args: &JsonValue) -> JsonValue {
281 let Some(a) = core_arg_f64(args, "a") else {
282 return json!({ "error": "missing numeric arg: a" });
283 };
284 let Some(b) = core_arg_f64(args, "b") else {
285 return json!({ "error": "missing numeric arg: b" });
286 };
287 json!(a + b)
288}
289
290pub fn sub(args: &JsonValue) -> JsonValue {
291 let Some(a) = core_arg_f64(args, "a") else {
292 return json!({ "error": "missing numeric arg: a" });
293 };
294 let Some(b) = core_arg_f64(args, "b") else {
295 return json!({ "error": "missing numeric arg: b" });
296 };
297 json!(a - b)
298}
299
300pub fn inc(args: &JsonValue) -> JsonValue {
301 let value = core_arg_f64(args, "value")
302 .or_else(|| args.get("__input").and_then(|v| v.as_f64()))
303 .unwrap_or(0.0);
304 json!(value + 1.0)
305}
306
307pub fn dec(args: &JsonValue) -> JsonValue {
308 let value = core_arg_f64(args, "value")
309 .or_else(|| args.get("__input").and_then(|v| v.as_f64()))
310 .unwrap_or(0.0);
311 json!(value - 1.0)
312}
313
314pub fn eq(args: &JsonValue) -> JsonValue {
315 let a = args.get("a").cloned().unwrap_or(JsonValue::Null);
316 let b = args.get("b").cloned().unwrap_or(JsonValue::Null);
317 json!({ "value": a == b })
318}
319
320pub fn lt(args: &JsonValue) -> JsonValue {
321 compare_numeric(args, |a, b| a < b)
322}
323
324pub fn gt(args: &JsonValue) -> JsonValue {
325 compare_numeric(args, |a, b| a > b)
326}
327
328pub fn gte(args: &JsonValue) -> JsonValue {
329 compare_numeric(args, |a, b| a >= b)
330}
331
332pub fn lte(args: &JsonValue) -> JsonValue {
333 compare_numeric(args, |a, b| a <= b)
334}
335
336pub fn inc_field(args: &JsonValue) -> JsonValue {
337 bump_field(args, 1.0)
338}
339
340pub fn dec_field(args: &JsonValue) -> JsonValue {
341 bump_field(args, -1.0)
342}
343
344pub fn set_fields(args: &JsonValue) -> JsonValue {
345 let fields = args
346 .get("fields")
347 .and_then(|v| v.as_object())
348 .cloned()
349 .unwrap_or_default();
350
351 let mut input = args
352 .get("input")
353 .and_then(|v| v.as_object())
354 .cloned()
355 .or_else(|| args.get("__input").and_then(|v| v.as_object()).cloned())
356 .unwrap_or_default();
357
358 for (key, value) in fields {
359 input.insert(key, value);
360 }
361
362 JsonValue::Object(input)
363}
364
365pub fn split(args: &JsonValue) -> JsonValue {
366 let text = core_arg_text(args, "text");
367 let sep = args.get("sep").and_then(|v| v.as_str()).unwrap_or(",");
368
369 JsonValue::Array(
370 text.split(sep)
371 .map(|s| JsonValue::String(s.to_string()))
372 .collect(),
373 )
374}
375
376pub fn join(args: &JsonValue) -> JsonValue {
377 let sep = args.get("sep").and_then(|v| v.as_str()).unwrap_or(",");
378 let items = core_items(args);
379 let joined = items
380 .iter()
381 .map(core_scalar_to_key)
382 .collect::<Vec<_>>()
383 .join(sep);
384 json!({ "text": joined })
385}
386
387pub fn replace(args: &JsonValue) -> JsonValue {
388 let text = core_arg_text(args, "text");
389 let from = args.get("from").and_then(|v| v.as_str()).unwrap_or("");
390 let to = args.get("to").and_then(|v| v.as_str()).unwrap_or("");
391 json!({ "text": text.replace(from, to) })
392}
393
394pub fn trim(args: &JsonValue) -> JsonValue {
395 let text = core_arg_text(args, "text");
396 json!({ "text": text.trim() })
397}
398
399pub fn lower(args: &JsonValue) -> JsonValue {
400 let text = core_arg_text(args, "text");
401 json!({ "text": text.to_lowercase() })
402}
403
404pub fn upper(args: &JsonValue) -> JsonValue {
405 let text = core_arg_text(args, "text");
406 json!({ "text": text.to_uppercase() })
407}
408
409pub fn contains(args: &JsonValue) -> JsonValue {
410 let haystack = args
411 .get("haystack")
412 .cloned()
413 .or_else(|| args.get("__input").cloned())
414 .unwrap_or(JsonValue::Null);
415 let needle = args.get("needle").cloned().unwrap_or(JsonValue::Null);
416
417 let contains = match &haystack {
418 JsonValue::String(s) => needle.as_str().map(|n| s.contains(n)).unwrap_or(false),
419 JsonValue::Array(items) => items.iter().any(|item| item == &needle),
420 JsonValue::Object(map) => needle.as_str().map(|k| map.contains_key(k)).unwrap_or(false),
421 _ => false,
422 };
423
424 json!({ "contains": contains })
425}
426
427pub fn get_path(args: &JsonValue) -> JsonValue {
428 let path = args.get("path").and_then(|v| v.as_str()).unwrap_or("");
429 let input = args
430 .get("input")
431 .cloned()
432 .or_else(|| args.get("__input").cloned())
433 .unwrap_or(JsonValue::Null);
434
435 json_get_path_value(&input, path).unwrap_or(JsonValue::Null)
436}
437
438pub fn set_path(args: &JsonValue) -> JsonValue {
439 let path = args.get("path").and_then(|v| v.as_str()).unwrap_or("");
440 let value = args.get("value").cloned().unwrap_or(JsonValue::Null);
441 let input = args
442 .get("input")
443 .cloned()
444 .or_else(|| args.get("__input").cloned())
445 .unwrap_or_else(|| JsonValue::Object(Map::new()));
446
447 json_set_path_value(&input, path, value)
448}
449
450pub fn has_path(args: &JsonValue) -> JsonValue {
451 let path = args.get("path").and_then(|v| v.as_str()).unwrap_or("");
452 let input = args
453 .get("input")
454 .cloned()
455 .or_else(|| args.get("__input").cloned())
456 .unwrap_or(JsonValue::Null);
457
458 json!({ "has_path": json_get_path_value(&input, path).is_some() })
459}
460
461fn core_items(args: &JsonValue) -> Vec<JsonValue> {
462 args.get("items")
463 .and_then(|v| v.as_array())
464 .cloned()
465 .or_else(|| args.get("__input").and_then(|v| v.as_array()).cloned())
466 .unwrap_or_default()
467}
468
469fn core_scalar_to_key(value: &JsonValue) -> String {
470 match value {
471 JsonValue::String(s) => s.clone(),
472 JsonValue::Number(n) => n.to_string(),
473 JsonValue::Bool(b) => b.to_string(),
474 JsonValue::Null => "null".to_string(),
475 _ => serde_json::to_string(value).unwrap_or_else(|_| "<value>".to_string()),
476 }
477}
478
479fn json_get_path_value(input: &JsonValue, path: &str) -> Option<JsonValue> {
480 let segments = path.split('.').filter(|s| !s.is_empty()).collect::<Vec<_>>();
481 if segments.is_empty() {
482 return Some(input.clone());
483 }
484
485 let mut current = input;
486 for segment in segments {
487 current = match current {
488 JsonValue::Object(map) => map.get(segment)?,
489 JsonValue::Array(items) => {
490 let idx = segment.parse::<usize>().ok()?;
491 items.get(idx)?
492 }
493 _ => return None,
494 };
495 }
496
497 Some(current.clone())
498}
499
500fn json_set_path_value(input: &JsonValue, path: &str, value: JsonValue) -> JsonValue {
501 let segments = path.split('.').filter(|s| !s.is_empty()).collect::<Vec<_>>();
502 if segments.is_empty() {
503 return input.clone();
504 }
505
506 let mut output = if input.is_object() {
507 input.clone()
508 } else {
509 JsonValue::Object(Map::new())
510 };
511
512 set_path_recursive(&mut output, &segments, value);
513 output
514}
515
516fn set_path_recursive(node: &mut JsonValue, segments: &[&str], value: JsonValue) {
517 if segments.is_empty() {
518 return;
519 }
520
521 if segments.len() == 1 {
522 if !node.is_object() {
523 *node = JsonValue::Object(Map::new());
524 }
525 if let Some(map) = node.as_object_mut() {
526 map.insert(segments[0].to_string(), value);
527 }
528 return;
529 }
530
531 if !node.is_object() {
532 *node = JsonValue::Object(Map::new());
533 }
534
535 if let Some(map) = node.as_object_mut() {
536 let child = map
537 .entry(segments[0].to_string())
538 .or_insert_with(|| JsonValue::Object(Map::new()));
539 set_path_recursive(child, &segments[1..], value);
540 }
541}
542
543fn core_arg_text(args: &JsonValue, key: &str) -> String {
544 args.get(key)
545 .and_then(|v| v.as_str())
546 .map(ToOwned::to_owned)
547 .or_else(|| args.get("__input").and_then(|v| v.as_str()).map(ToOwned::to_owned))
548 .unwrap_or_default()
549}
550
551fn core_arg_f64(args: &JsonValue, key: &str) -> Option<f64> {
552 args.get(key)
553 .and_then(|v| v.as_f64().or_else(|| v.as_str().and_then(|s| s.parse::<f64>().ok())))
554}
555
556fn compare_numeric(args: &JsonValue, pred: fn(f64, f64) -> bool) -> JsonValue {
557 let Some(a) = core_arg_f64(args, "a") else {
558 return json!({ "error": "missing numeric arg: a" });
559 };
560 let Some(b) = core_arg_f64(args, "b") else {
561 return json!({ "error": "missing numeric arg: b" });
562 };
563 json!({ "value": pred(a, b) })
564}
565
566fn bump_field(args: &JsonValue, delta: f64) -> JsonValue {
567 let field = args.get("field").and_then(|v| v.as_str()).unwrap_or("");
568 if field.is_empty() {
569 return json!({ "error": "missing arg: field" });
570 }
571
572 let mut input = args
573 .get("input")
574 .and_then(|v| v.as_object())
575 .cloned()
576 .or_else(|| args.get("__input").and_then(|v| v.as_object()).cloned())
577 .unwrap_or_default();
578
579 let current = input.get(field).and_then(|v| v.as_f64()).unwrap_or(0.0);
580 input.insert(field.to_string(), json!(current + delta));
581 JsonValue::Object(input)
582}