1use serde_json::{json, Map, Value as JsonValue};
2
3pub fn echo(args: &JsonValue) -> JsonValue {
4 let message = core_arg_message(args, "message");
5 json!({ "message": message })
6}
7
8pub fn tap(args: &JsonValue) -> JsonValue {
9 args.get("__input").cloned().unwrap_or_else(|| {
10 let message = core_arg_message(args, "message");
11 serde_json::json!({ "message": message })
12 })
13}
14
15pub fn pack_state_data(args: &JsonValue) -> JsonValue {
16 let state = args.get("state").cloned().unwrap_or(JsonValue::Null);
17 let data = args
18 .get("data")
19 .cloned()
20 .or_else(|| args.get("__input").cloned())
21 .unwrap_or(JsonValue::Null);
22
23 serde_json::json!({
24 "state": state,
25 "data": data,
26 })
27}
28
29pub fn get_state(args: &JsonValue) -> JsonValue {
30 args.get("input")
31 .or_else(|| args.get("__input"))
32 .and_then(|v| v.get("state"))
33 .cloned()
34 .unwrap_or(JsonValue::Null)
35}
36
37pub fn get_data(args: &JsonValue) -> JsonValue {
38 args.get("input")
39 .or_else(|| args.get("__input"))
40 .and_then(|v| v.get("data"))
41 .cloned()
42 .unwrap_or(JsonValue::Null)
43}
44
45pub fn apply_lane(args: &JsonValue) -> JsonValue {
46 let lane = args.get("lane").and_then(|v| v.as_str()).unwrap_or("");
47 let fields = args
48 .get("fields")
49 .and_then(|v| v.as_object())
50 .cloned()
51 .unwrap_or_default();
52
53 let input = args
54 .get("input")
55 .cloned()
56 .or_else(|| args.get("__input").cloned())
57 .unwrap_or(JsonValue::Null);
58
59 let mut out = match input {
60 JsonValue::Object(map) => map,
61 other => {
62 let mut map = Map::new();
63 map.insert("data".to_string(), other);
64 map
65 }
66 };
67
68 let mut lane_obj = out
69 .get(lane)
70 .and_then(|v| v.as_object())
71 .cloned()
72 .unwrap_or_default();
73
74 for (k, v) in fields {
75 lane_obj.insert(k, v);
76 }
77
78 out.insert(lane.to_string(), JsonValue::Object(lane_obj));
79 JsonValue::Object(out)
80}
81
82pub fn map(args: &JsonValue) -> JsonValue {
83 let items = core_items(args);
84 let Some(field) = args.get("field").and_then(|v| v.as_str()) else {
85 return JsonValue::Array(items);
86 };
87
88 JsonValue::Array(
89 items
90 .into_iter()
91 .map(|item| item.get(field).cloned().unwrap_or(JsonValue::Null))
92 .collect(),
93 )
94}
95
96pub fn filter(args: &JsonValue) -> JsonValue {
97 let items = core_items(args);
98 let Some(field) = args.get("field").and_then(|v| v.as_str()) else {
99 return JsonValue::Array(items);
100 };
101 let equals = args.get("equals").cloned().unwrap_or(JsonValue::Null);
102
103 JsonValue::Array(
104 items
105 .into_iter()
106 .filter(|item| item.get(field).map(|v| v == &equals).unwrap_or(false))
107 .collect(),
108 )
109}
110
111pub fn find(args: &JsonValue) -> JsonValue {
112 let items = core_items(args);
113 let Some(field) = args.get("field").and_then(|v| v.as_str()) else {
114 return JsonValue::Null;
115 };
116 let equals = args.get("equals").cloned().unwrap_or(JsonValue::Null);
117
118 items
119 .into_iter()
120 .find(|item| item.get(field).map(|v| v == &equals).unwrap_or(false))
121 .unwrap_or(JsonValue::Null)
122}
123
124pub fn reduce(args: &JsonValue) -> JsonValue {
125 let items = core_items(args);
126 let mode = args.get("mode").and_then(|v| v.as_str()).unwrap_or("last");
127
128 match mode {
129 "sum" => {
130 let sum = items
131 .iter()
132 .filter_map(|v| v.as_f64())
133 .fold(0.0, |acc, n| acc + n);
134 json!(sum)
135 }
136 "min" => items
137 .iter()
138 .filter_map(|v| v.as_f64())
139 .reduce(f64::min)
140 .map(|v| json!(v))
141 .unwrap_or(JsonValue::Null),
142 "max" => items
143 .iter()
144 .filter_map(|v| v.as_f64())
145 .reduce(f64::max)
146 .map(|v| json!(v))
147 .unwrap_or(JsonValue::Null),
148 "avg" => {
149 let nums = items.iter().filter_map(|v| v.as_f64()).collect::<Vec<_>>();
150 if nums.is_empty() {
151 JsonValue::Null
152 } else {
153 json!(nums.iter().sum::<f64>() / nums.len() as f64)
154 }
155 }
156 "concat" => {
157 let initial = args.get("initial").and_then(|v| v.as_str()).unwrap_or("");
158 let joined = items
159 .iter()
160 .map(core_scalar_to_key)
161 .collect::<Vec<_>>()
162 .join("");
163 json!(format!("{initial}{joined}"))
164 }
165 "count" => json!(items.len()),
166 "first" => items.first().cloned().unwrap_or(JsonValue::Null),
167 "last" => items.last().cloned().unwrap_or(JsonValue::Null),
168 _ => json!({
169 "error": format!("unsupported reduce mode '{mode}'"),
170 "supported_modes": ["sum", "min", "max", "avg", "concat", "count", "first", "last"]
171 }),
172 }
173}
174
175pub fn group_by(args: &JsonValue) -> JsonValue {
176 let items = core_items(args);
177 let Some(field) = args.get("field").and_then(|v| v.as_str()) else {
178 return JsonValue::Object(Map::new());
179 };
180
181 let mut grouped = Map::new();
182 for item in items {
183 let key = item
184 .get(field)
185 .map(core_scalar_to_key)
186 .unwrap_or_else(|| "null".to_string());
187
188 let entry = grouped
189 .entry(key)
190 .or_insert_with(|| JsonValue::Array(Vec::new()));
191 if let Some(values) = entry.as_array_mut() {
192 values.push(item);
193 }
194 }
195
196 JsonValue::Object(grouped)
197}
198
199pub fn merge(args: &JsonValue) -> JsonValue {
200 let left = args
201 .get("left")
202 .and_then(|v| v.as_object())
203 .cloned()
204 .or_else(|| args.get("__input").and_then(|v| v.as_object()).cloned())
205 .unwrap_or_default();
206
207 let right = args
208 .get("right")
209 .and_then(|v| v.as_object())
210 .cloned()
211 .unwrap_or_default();
212
213 let mut merged = left;
214 for (k, v) in right {
215 merged.insert(k, v);
216 }
217
218 JsonValue::Object(merged)
219}
220
221pub fn pick(args: &JsonValue) -> JsonValue {
222 let input = args
223 .get("input")
224 .and_then(|v| v.as_object())
225 .cloned()
226 .or_else(|| args.get("__input").and_then(|v| v.as_object()).cloned())
227 .unwrap_or_default();
228
229 let fields = args
230 .get("fields")
231 .and_then(|v| v.as_array())
232 .cloned()
233 .unwrap_or_default();
234
235 let mut picked = Map::new();
236 for field in fields.iter().filter_map(|v| v.as_str()) {
237 if let Some(value) = input.get(field) {
238 picked.insert(field.to_string(), value.clone());
239 }
240 }
241 JsonValue::Object(picked)
242}
243
244pub fn validate_schema(args: &JsonValue) -> JsonValue {
245 let required = args
246 .get("required")
247 .and_then(|v| v.as_array())
248 .cloned()
249 .unwrap_or_default();
250 let data = args
251 .get("data")
252 .and_then(|v| v.as_object())
253 .cloned()
254 .unwrap_or_default();
255
256 let missing = required
257 .iter()
258 .filter_map(|v| v.as_str())
259 .filter(|name| !data.contains_key(*name))
260 .map(|s| JsonValue::String(s.to_string()))
261 .collect::<Vec<_>>();
262
263 json!({
264 "ok": missing.is_empty(),
265 "missing": missing,
266 })
267}
268
269pub fn add(args: &JsonValue) -> JsonValue {
270 let Some(a) = core_arg_f64(args, "a") else {
271 return json!({ "error": "missing numeric arg: a" });
272 };
273 let Some(b) = core_arg_f64(args, "b") else {
274 return json!({ "error": "missing numeric arg: b" });
275 };
276 json!(a + b)
277}
278
279pub fn sub(args: &JsonValue) -> JsonValue {
280 let Some(a) = core_arg_f64(args, "a") else {
281 return json!({ "error": "missing numeric arg: a" });
282 };
283 let Some(b) = core_arg_f64(args, "b") else {
284 return json!({ "error": "missing numeric arg: b" });
285 };
286 json!(a - b)
287}
288
289pub fn inc(args: &JsonValue) -> JsonValue {
290 let value = core_arg_f64(args, "value")
291 .or_else(|| args.get("__input").and_then(|v| v.as_f64()))
292 .unwrap_or(0.0);
293 json!(value + 1.0)
294}
295
296pub fn dec(args: &JsonValue) -> JsonValue {
297 let value = core_arg_f64(args, "value")
298 .or_else(|| args.get("__input").and_then(|v| v.as_f64()))
299 .unwrap_or(0.0);
300 json!(value - 1.0)
301}
302
303pub fn eq(args: &JsonValue) -> JsonValue {
304 let a = args.get("a").cloned().unwrap_or(JsonValue::Null);
305 let b = args.get("b").cloned().unwrap_or(JsonValue::Null);
306 json!({ "value": a == b })
307}
308
309pub fn lt(args: &JsonValue) -> JsonValue {
310 compare_numeric(args, |a, b| a < b)
311}
312
313pub fn gt(args: &JsonValue) -> JsonValue {
314 compare_numeric(args, |a, b| a > b)
315}
316
317pub fn gte(args: &JsonValue) -> JsonValue {
318 compare_numeric(args, |a, b| a >= b)
319}
320
321pub fn lte(args: &JsonValue) -> JsonValue {
322 compare_numeric(args, |a, b| a <= b)
323}
324
325pub fn inc_field(args: &JsonValue) -> JsonValue {
326 bump_field(args, 1.0)
327}
328
329pub fn dec_field(args: &JsonValue) -> JsonValue {
330 bump_field(args, -1.0)
331}
332
333pub fn set_fields(args: &JsonValue) -> JsonValue {
334 let fields = args
335 .get("fields")
336 .and_then(|v| v.as_object())
337 .cloned()
338 .unwrap_or_default();
339
340 let mut input = args
341 .get("input")
342 .and_then(|v| v.as_object())
343 .cloned()
344 .or_else(|| args.get("__input").and_then(|v| v.as_object()).cloned())
345 .unwrap_or_default();
346
347 for (key, value) in fields {
348 input.insert(key, value);
349 }
350
351 JsonValue::Object(input)
352}
353
354pub fn split(args: &JsonValue) -> JsonValue {
355 let text = core_arg_text(args, "text");
356 let sep = args.get("sep").and_then(|v| v.as_str()).unwrap_or(",");
357
358 JsonValue::Array(
359 text.split(sep)
360 .map(|s| JsonValue::String(s.to_string()))
361 .collect(),
362 )
363}
364
365pub fn join(args: &JsonValue) -> JsonValue {
366 let sep = args.get("sep").and_then(|v| v.as_str()).unwrap_or(",");
367 let items = core_items(args);
368 let joined = items
369 .iter()
370 .map(core_scalar_to_key)
371 .collect::<Vec<_>>()
372 .join(sep);
373 json!({ "text": joined })
374}
375
376pub fn replace(args: &JsonValue) -> JsonValue {
377 let text = core_arg_text(args, "text");
378 let from = args.get("from").and_then(|v| v.as_str()).unwrap_or("");
379 let to = args.get("to").and_then(|v| v.as_str()).unwrap_or("");
380 json!({ "text": text.replace(from, to) })
381}
382
383pub fn trim(args: &JsonValue) -> JsonValue {
384 let text = core_arg_text(args, "text");
385 json!({ "text": text.trim() })
386}
387
388pub fn lower(args: &JsonValue) -> JsonValue {
389 let text = core_arg_text(args, "text");
390 json!({ "text": text.to_lowercase() })
391}
392
393pub fn upper(args: &JsonValue) -> JsonValue {
394 let text = core_arg_text(args, "text");
395 json!({ "text": text.to_uppercase() })
396}
397
398pub fn contains(args: &JsonValue) -> JsonValue {
399 let haystack = args
400 .get("haystack")
401 .cloned()
402 .or_else(|| args.get("__input").cloned())
403 .unwrap_or(JsonValue::Null);
404 let needle = args.get("needle").cloned().unwrap_or(JsonValue::Null);
405
406 let contains = match &haystack {
407 JsonValue::String(s) => needle.as_str().map(|n| s.contains(n)).unwrap_or(false),
408 JsonValue::Array(items) => items.iter().any(|item| item == &needle),
409 JsonValue::Object(map) => needle
410 .as_str()
411 .map(|k| map.contains_key(k))
412 .unwrap_or(false),
413 _ => false,
414 };
415
416 json!({ "contains": contains })
417}
418
419pub fn get_path(args: &JsonValue) -> JsonValue {
420 let path = args.get("path").and_then(|v| v.as_str()).unwrap_or("");
421 let input = args
422 .get("input")
423 .cloned()
424 .or_else(|| args.get("__input").cloned())
425 .unwrap_or(JsonValue::Null);
426
427 json_get_path_value(&input, path).unwrap_or(JsonValue::Null)
428}
429
430pub fn set_path(args: &JsonValue) -> JsonValue {
431 let path = args.get("path").and_then(|v| v.as_str()).unwrap_or("");
432 let value = args.get("value").cloned().unwrap_or(JsonValue::Null);
433 let input = args
434 .get("input")
435 .cloned()
436 .or_else(|| args.get("__input").cloned())
437 .unwrap_or_else(|| JsonValue::Object(Map::new()));
438
439 json_set_path_value(&input, path, value)
440}
441
442pub fn has_path(args: &JsonValue) -> JsonValue {
443 let path = args.get("path").and_then(|v| v.as_str()).unwrap_or("");
444 let input = args
445 .get("input")
446 .cloned()
447 .or_else(|| args.get("__input").cloned())
448 .unwrap_or(JsonValue::Null);
449
450 json!({ "has_path": json_get_path_value(&input, path).is_some() })
451}
452
453fn core_items(args: &JsonValue) -> Vec<JsonValue> {
454 args.get("items")
455 .and_then(|v| v.as_array())
456 .cloned()
457 .or_else(|| args.get("__input").and_then(|v| v.as_array()).cloned())
458 .unwrap_or_default()
459}
460
461fn core_scalar_to_key(value: &JsonValue) -> String {
462 match value {
463 JsonValue::String(s) => s.clone(),
464 JsonValue::Number(n) => n.to_string(),
465 JsonValue::Bool(b) => b.to_string(),
466 JsonValue::Null => "null".to_string(),
467 _ => serde_json::to_string(value).unwrap_or_else(|_| "<value>".to_string()),
468 }
469}
470
471fn json_get_path_value(input: &JsonValue, path: &str) -> Option<JsonValue> {
472 let segments = path
473 .split('.')
474 .filter(|s| !s.is_empty())
475 .collect::<Vec<_>>();
476 if segments.is_empty() {
477 return Some(input.clone());
478 }
479
480 let mut current = input;
481 for segment in segments {
482 current = match current {
483 JsonValue::Object(map) => map.get(segment)?,
484 JsonValue::Array(items) => {
485 let idx = segment.parse::<usize>().ok()?;
486 items.get(idx)?
487 }
488 _ => return None,
489 };
490 }
491
492 Some(current.clone())
493}
494
495fn json_set_path_value(input: &JsonValue, path: &str, value: JsonValue) -> JsonValue {
496 let segments = path
497 .split('.')
498 .filter(|s| !s.is_empty())
499 .collect::<Vec<_>>();
500 if segments.is_empty() {
501 return input.clone();
502 }
503
504 let mut output = if input.is_object() {
505 input.clone()
506 } else {
507 JsonValue::Object(Map::new())
508 };
509
510 set_path_recursive(&mut output, &segments, value);
511 output
512}
513
514fn set_path_recursive(node: &mut JsonValue, segments: &[&str], value: JsonValue) {
515 if segments.is_empty() {
516 return;
517 }
518
519 if segments.len() == 1 {
520 if !node.is_object() {
521 *node = JsonValue::Object(Map::new());
522 }
523 if let Some(map) = node.as_object_mut() {
524 map.insert(segments[0].to_string(), value);
525 }
526 return;
527 }
528
529 if !node.is_object() {
530 *node = JsonValue::Object(Map::new());
531 }
532
533 if let Some(map) = node.as_object_mut() {
534 let child = map
535 .entry(segments[0].to_string())
536 .or_insert_with(|| JsonValue::Object(Map::new()));
537 set_path_recursive(child, &segments[1..], value);
538 }
539}
540
541fn core_arg_text(args: &JsonValue, key: &str) -> String {
542 args.get(key)
543 .and_then(|v| v.as_str())
544 .map(ToOwned::to_owned)
545 .or_else(|| {
546 args.get("__input")
547 .and_then(|v| v.as_str())
548 .map(ToOwned::to_owned)
549 })
550 .unwrap_or_default()
551}
552
553fn core_arg_message(args: &JsonValue, key: &str) -> String {
554 args.get(key).map(core_scalar_to_key).unwrap_or_default()
555}
556
557fn core_arg_f64(args: &JsonValue, key: &str) -> Option<f64> {
558 args.get(key).and_then(|v| {
559 v.as_f64()
560 .or_else(|| v.as_str().and_then(|s| s.parse::<f64>().ok()))
561 })
562}
563
564fn compare_numeric(args: &JsonValue, pred: fn(f64, f64) -> bool) -> JsonValue {
565 let Some(a) = core_arg_f64(args, "a") else {
566 return json!({ "error": "missing numeric arg: a" });
567 };
568 let Some(b) = core_arg_f64(args, "b") else {
569 return json!({ "error": "missing numeric arg: b" });
570 };
571 json!({ "value": pred(a, b) })
572}
573
574fn bump_field(args: &JsonValue, delta: f64) -> JsonValue {
575 let field = args.get("field").and_then(|v| v.as_str()).unwrap_or("");
576 if field.is_empty() {
577 return json!({ "error": "missing arg: field" });
578 }
579
580 let mut input = args
581 .get("input")
582 .and_then(|v| v.as_object())
583 .cloned()
584 .or_else(|| args.get("__input").and_then(|v| v.as_object()).cloned())
585 .unwrap_or_default();
586
587 let current = input.get(field).and_then(|v| v.as_f64()).unwrap_or(0.0);
588 input.insert(field.to_string(), json!(current + delta));
589 JsonValue::Object(input)
590}
591
592#[cfg(test)]
593mod tests {
594 use super::*;
595
596 #[test]
597 fn echo_accepts_numeric_message_values() {
598 let out = echo(&json!({ "message": 42 }));
599 assert_eq!(out, json!({ "message": "42" }));
600 }
601
602 #[test]
603 fn echo_accepts_object_message_values() {
604 let out = echo(&json!({ "message": { "a": 1, "b": true } }));
605 assert_eq!(out, json!({ "message": "{\"a\":1,\"b\":true}" }));
606 }
607
608 #[test]
609 fn tap_preserves_input_when_present() {
610 let out = tap(&json!({ "__input": { "value": 1 }, "message": "ignored" }));
611 assert_eq!(out, json!({ "value": 1 }));
612 }
613}