1use serde_json::{json, Map, Value as JsonValue};
2
3pub fn echo(args: &JsonValue) -> JsonValue {
4 let message = core_arg_message(args, "message");
5 json!({ "message": message })
6}
7
8pub fn tap(args: &JsonValue) -> JsonValue {
9 args.get("__input").cloned().unwrap_or_else(|| {
10 let message = core_arg_message(args, "message");
11 serde_json::json!({ "message": message })
12 })
13}
14
15pub fn pack_state_data(args: &JsonValue) -> JsonValue {
16 let state = args.get("state").cloned().unwrap_or(JsonValue::Null);
17 let data = args
18 .get("data")
19 .cloned()
20 .or_else(|| args.get("__input").cloned())
21 .unwrap_or(JsonValue::Null);
22
23 serde_json::json!({
24 "state": state,
25 "data": data,
26 })
27}
28
29pub fn get_state(args: &JsonValue) -> JsonValue {
30 args.get("input")
31 .or_else(|| args.get("__input"))
32 .and_then(|v| v.get("state"))
33 .cloned()
34 .unwrap_or(JsonValue::Null)
35}
36
37pub fn get_data(args: &JsonValue) -> JsonValue {
38 args.get("input")
39 .or_else(|| args.get("__input"))
40 .and_then(|v| v.get("data"))
41 .cloned()
42 .unwrap_or(JsonValue::Null)
43}
44
45pub fn apply_lane(args: &JsonValue) -> JsonValue {
46 let lane = args.get("lane").and_then(|v| v.as_str()).unwrap_or("");
47 let fields = args
48 .get("fields")
49 .and_then(|v| v.as_object())
50 .cloned()
51 .unwrap_or_default();
52
53 let input = args
54 .get("input")
55 .cloned()
56 .or_else(|| args.get("__input").cloned())
57 .unwrap_or(JsonValue::Null);
58
59 let mut out = match input {
60 JsonValue::Object(map) => map,
61 other => {
62 let mut map = Map::new();
63 map.insert("data".to_string(), other);
64 map
65 }
66 };
67
68 let mut lane_obj = out
69 .get(lane)
70 .and_then(|v| v.as_object())
71 .cloned()
72 .unwrap_or_default();
73
74 for (k, v) in fields {
75 lane_obj.insert(k, v);
76 }
77
78 out.insert(lane.to_string(), JsonValue::Object(lane_obj));
79 JsonValue::Object(out)
80}
81
82pub fn map(args: &JsonValue) -> JsonValue {
83 let items = core_items(args);
84 let Some(field) = args.get("field").and_then(|v| v.as_str()) else {
85 return JsonValue::Array(items);
86 };
87
88 JsonValue::Array(
89 items
90 .into_iter()
91 .map(|item| item.get(field).cloned().unwrap_or(JsonValue::Null))
92 .collect(),
93 )
94}
95
96pub fn filter(args: &JsonValue) -> JsonValue {
97 let items = core_items(args);
98 let Some(field) = args.get("field").and_then(|v| v.as_str()) else {
99 return JsonValue::Array(items);
100 };
101 let equals = args.get("equals").cloned().unwrap_or(JsonValue::Null);
102
103 JsonValue::Array(
104 items
105 .into_iter()
106 .filter(|item| item.get(field).map(|v| v == &equals).unwrap_or(false))
107 .collect(),
108 )
109}
110
111pub fn find(args: &JsonValue) -> JsonValue {
112 let items = core_items(args);
113 let Some(field) = args.get("field").and_then(|v| v.as_str()) else {
114 return JsonValue::Null;
115 };
116 let equals = args.get("equals").cloned().unwrap_or(JsonValue::Null);
117
118 items
119 .into_iter()
120 .find(|item| item.get(field).map(|v| v == &equals).unwrap_or(false))
121 .unwrap_or(JsonValue::Null)
122}
123
124pub fn reduce(args: &JsonValue) -> JsonValue {
125 let items = core_items(args);
126 let mode = args.get("mode").and_then(|v| v.as_str()).unwrap_or("last");
127
128 match mode {
129 "sum" => {
130 let sum = items
131 .iter()
132 .filter_map(|v| v.as_f64())
133 .fold(0.0, |acc, n| acc + n);
134 json!(sum)
135 }
136 "min" => items
137 .iter()
138 .filter_map(|v| v.as_f64())
139 .reduce(f64::min)
140 .map(|v| json!(v))
141 .unwrap_or(JsonValue::Null),
142 "max" => items
143 .iter()
144 .filter_map(|v| v.as_f64())
145 .reduce(f64::max)
146 .map(|v| json!(v))
147 .unwrap_or(JsonValue::Null),
148 "avg" => {
149 let nums = items.iter().filter_map(|v| v.as_f64()).collect::<Vec<_>>();
150 if nums.is_empty() {
151 JsonValue::Null
152 } else {
153 json!(nums.iter().sum::<f64>() / nums.len() as f64)
154 }
155 }
156 "concat" => {
157 let initial = args
158 .get("initial")
159 .and_then(|v| v.as_str())
160 .unwrap_or("");
161 let joined = items
162 .iter()
163 .map(core_scalar_to_key)
164 .collect::<Vec<_>>()
165 .join("");
166 json!(format!("{initial}{joined}"))
167 }
168 "count" => json!(items.len()),
169 "first" => items.first().cloned().unwrap_or(JsonValue::Null),
170 "last" => items.last().cloned().unwrap_or(JsonValue::Null),
171 _ => json!({
172 "error": format!("unsupported reduce mode '{mode}'"),
173 "supported_modes": ["sum", "min", "max", "avg", "concat", "count", "first", "last"]
174 }),
175 }
176}
177
178pub fn group_by(args: &JsonValue) -> JsonValue {
179 let items = core_items(args);
180 let Some(field) = args.get("field").and_then(|v| v.as_str()) else {
181 return JsonValue::Object(Map::new());
182 };
183
184 let mut grouped = Map::new();
185 for item in items {
186 let key = item
187 .get(field)
188 .map(core_scalar_to_key)
189 .unwrap_or_else(|| "null".to_string());
190
191 let entry = grouped
192 .entry(key)
193 .or_insert_with(|| JsonValue::Array(Vec::new()));
194 if let Some(values) = entry.as_array_mut() {
195 values.push(item);
196 }
197 }
198
199 JsonValue::Object(grouped)
200}
201
202pub fn merge(args: &JsonValue) -> JsonValue {
203 let left = args
204 .get("left")
205 .and_then(|v| v.as_object())
206 .cloned()
207 .or_else(|| args.get("__input").and_then(|v| v.as_object()).cloned())
208 .unwrap_or_default();
209
210 let right = args
211 .get("right")
212 .and_then(|v| v.as_object())
213 .cloned()
214 .unwrap_or_default();
215
216 let mut merged = left;
217 for (k, v) in right {
218 merged.insert(k, v);
219 }
220
221 JsonValue::Object(merged)
222}
223
224pub fn pick(args: &JsonValue) -> JsonValue {
225 let input = args
226 .get("input")
227 .and_then(|v| v.as_object())
228 .cloned()
229 .or_else(|| args.get("__input").and_then(|v| v.as_object()).cloned())
230 .unwrap_or_default();
231
232 let fields = args
233 .get("fields")
234 .and_then(|v| v.as_array())
235 .cloned()
236 .unwrap_or_default();
237
238 let mut picked = Map::new();
239 for field in fields.iter().filter_map(|v| v.as_str()) {
240 if let Some(value) = input.get(field) {
241 picked.insert(field.to_string(), value.clone());
242 }
243 }
244 JsonValue::Object(picked)
245}
246
247pub fn validate_schema(args: &JsonValue) -> JsonValue {
248 let required = args
249 .get("required")
250 .and_then(|v| v.as_array())
251 .cloned()
252 .unwrap_or_default();
253 let data = args
254 .get("data")
255 .and_then(|v| v.as_object())
256 .cloned()
257 .unwrap_or_default();
258
259 let missing = required
260 .iter()
261 .filter_map(|v| v.as_str())
262 .filter(|name| !data.contains_key(*name))
263 .map(|s| JsonValue::String(s.to_string()))
264 .collect::<Vec<_>>();
265
266 json!({
267 "ok": missing.is_empty(),
268 "missing": missing,
269 })
270}
271
272pub fn add(args: &JsonValue) -> JsonValue {
273 let Some(a) = core_arg_f64(args, "a") else {
274 return json!({ "error": "missing numeric arg: a" });
275 };
276 let Some(b) = core_arg_f64(args, "b") else {
277 return json!({ "error": "missing numeric arg: b" });
278 };
279 json!(a + b)
280}
281
282pub fn sub(args: &JsonValue) -> JsonValue {
283 let Some(a) = core_arg_f64(args, "a") else {
284 return json!({ "error": "missing numeric arg: a" });
285 };
286 let Some(b) = core_arg_f64(args, "b") else {
287 return json!({ "error": "missing numeric arg: b" });
288 };
289 json!(a - b)
290}
291
292pub fn inc(args: &JsonValue) -> JsonValue {
293 let value = core_arg_f64(args, "value")
294 .or_else(|| args.get("__input").and_then(|v| v.as_f64()))
295 .unwrap_or(0.0);
296 json!(value + 1.0)
297}
298
299pub fn dec(args: &JsonValue) -> JsonValue {
300 let value = core_arg_f64(args, "value")
301 .or_else(|| args.get("__input").and_then(|v| v.as_f64()))
302 .unwrap_or(0.0);
303 json!(value - 1.0)
304}
305
306pub fn eq(args: &JsonValue) -> JsonValue {
307 let a = args.get("a").cloned().unwrap_or(JsonValue::Null);
308 let b = args.get("b").cloned().unwrap_or(JsonValue::Null);
309 json!({ "value": a == b })
310}
311
312pub fn lt(args: &JsonValue) -> JsonValue {
313 compare_numeric(args, |a, b| a < b)
314}
315
316pub fn gt(args: &JsonValue) -> JsonValue {
317 compare_numeric(args, |a, b| a > b)
318}
319
320pub fn gte(args: &JsonValue) -> JsonValue {
321 compare_numeric(args, |a, b| a >= b)
322}
323
324pub fn lte(args: &JsonValue) -> JsonValue {
325 compare_numeric(args, |a, b| a <= b)
326}
327
328pub fn inc_field(args: &JsonValue) -> JsonValue {
329 bump_field(args, 1.0)
330}
331
332pub fn dec_field(args: &JsonValue) -> JsonValue {
333 bump_field(args, -1.0)
334}
335
336pub fn set_fields(args: &JsonValue) -> JsonValue {
337 let fields = args
338 .get("fields")
339 .and_then(|v| v.as_object())
340 .cloned()
341 .unwrap_or_default();
342
343 let mut input = args
344 .get("input")
345 .and_then(|v| v.as_object())
346 .cloned()
347 .or_else(|| args.get("__input").and_then(|v| v.as_object()).cloned())
348 .unwrap_or_default();
349
350 for (key, value) in fields {
351 input.insert(key, value);
352 }
353
354 JsonValue::Object(input)
355}
356
357pub fn split(args: &JsonValue) -> JsonValue {
358 let text = core_arg_text(args, "text");
359 let sep = args.get("sep").and_then(|v| v.as_str()).unwrap_or(",");
360
361 JsonValue::Array(
362 text.split(sep)
363 .map(|s| JsonValue::String(s.to_string()))
364 .collect(),
365 )
366}
367
368pub fn join(args: &JsonValue) -> JsonValue {
369 let sep = args.get("sep").and_then(|v| v.as_str()).unwrap_or(",");
370 let items = core_items(args);
371 let joined = items
372 .iter()
373 .map(core_scalar_to_key)
374 .collect::<Vec<_>>()
375 .join(sep);
376 json!({ "text": joined })
377}
378
379pub fn replace(args: &JsonValue) -> JsonValue {
380 let text = core_arg_text(args, "text");
381 let from = args.get("from").and_then(|v| v.as_str()).unwrap_or("");
382 let to = args.get("to").and_then(|v| v.as_str()).unwrap_or("");
383 json!({ "text": text.replace(from, to) })
384}
385
386pub fn trim(args: &JsonValue) -> JsonValue {
387 let text = core_arg_text(args, "text");
388 json!({ "text": text.trim() })
389}
390
391pub fn lower(args: &JsonValue) -> JsonValue {
392 let text = core_arg_text(args, "text");
393 json!({ "text": text.to_lowercase() })
394}
395
396pub fn upper(args: &JsonValue) -> JsonValue {
397 let text = core_arg_text(args, "text");
398 json!({ "text": text.to_uppercase() })
399}
400
401pub fn contains(args: &JsonValue) -> JsonValue {
402 let haystack = args
403 .get("haystack")
404 .cloned()
405 .or_else(|| args.get("__input").cloned())
406 .unwrap_or(JsonValue::Null);
407 let needle = args.get("needle").cloned().unwrap_or(JsonValue::Null);
408
409 let contains = match &haystack {
410 JsonValue::String(s) => needle.as_str().map(|n| s.contains(n)).unwrap_or(false),
411 JsonValue::Array(items) => items.iter().any(|item| item == &needle),
412 JsonValue::Object(map) => needle.as_str().map(|k| map.contains_key(k)).unwrap_or(false),
413 _ => false,
414 };
415
416 json!({ "contains": contains })
417}
418
419pub fn get_path(args: &JsonValue) -> JsonValue {
420 let path = args.get("path").and_then(|v| v.as_str()).unwrap_or("");
421 let input = args
422 .get("input")
423 .cloned()
424 .or_else(|| args.get("__input").cloned())
425 .unwrap_or(JsonValue::Null);
426
427 json_get_path_value(&input, path).unwrap_or(JsonValue::Null)
428}
429
430pub fn set_path(args: &JsonValue) -> JsonValue {
431 let path = args.get("path").and_then(|v| v.as_str()).unwrap_or("");
432 let value = args.get("value").cloned().unwrap_or(JsonValue::Null);
433 let input = args
434 .get("input")
435 .cloned()
436 .or_else(|| args.get("__input").cloned())
437 .unwrap_or_else(|| JsonValue::Object(Map::new()));
438
439 json_set_path_value(&input, path, value)
440}
441
442pub fn has_path(args: &JsonValue) -> JsonValue {
443 let path = args.get("path").and_then(|v| v.as_str()).unwrap_or("");
444 let input = args
445 .get("input")
446 .cloned()
447 .or_else(|| args.get("__input").cloned())
448 .unwrap_or(JsonValue::Null);
449
450 json!({ "has_path": json_get_path_value(&input, path).is_some() })
451}
452
453fn core_items(args: &JsonValue) -> Vec<JsonValue> {
454 args.get("items")
455 .and_then(|v| v.as_array())
456 .cloned()
457 .or_else(|| args.get("__input").and_then(|v| v.as_array()).cloned())
458 .unwrap_or_default()
459}
460
461fn core_scalar_to_key(value: &JsonValue) -> String {
462 match value {
463 JsonValue::String(s) => s.clone(),
464 JsonValue::Number(n) => n.to_string(),
465 JsonValue::Bool(b) => b.to_string(),
466 JsonValue::Null => "null".to_string(),
467 _ => serde_json::to_string(value).unwrap_or_else(|_| "<value>".to_string()),
468 }
469}
470
471fn json_get_path_value(input: &JsonValue, path: &str) -> Option<JsonValue> {
472 let segments = path.split('.').filter(|s| !s.is_empty()).collect::<Vec<_>>();
473 if segments.is_empty() {
474 return Some(input.clone());
475 }
476
477 let mut current = input;
478 for segment in segments {
479 current = match current {
480 JsonValue::Object(map) => map.get(segment)?,
481 JsonValue::Array(items) => {
482 let idx = segment.parse::<usize>().ok()?;
483 items.get(idx)?
484 }
485 _ => return None,
486 };
487 }
488
489 Some(current.clone())
490}
491
492fn json_set_path_value(input: &JsonValue, path: &str, value: JsonValue) -> JsonValue {
493 let segments = path.split('.').filter(|s| !s.is_empty()).collect::<Vec<_>>();
494 if segments.is_empty() {
495 return input.clone();
496 }
497
498 let mut output = if input.is_object() {
499 input.clone()
500 } else {
501 JsonValue::Object(Map::new())
502 };
503
504 set_path_recursive(&mut output, &segments, value);
505 output
506}
507
508fn set_path_recursive(node: &mut JsonValue, segments: &[&str], value: JsonValue) {
509 if segments.is_empty() {
510 return;
511 }
512
513 if segments.len() == 1 {
514 if !node.is_object() {
515 *node = JsonValue::Object(Map::new());
516 }
517 if let Some(map) = node.as_object_mut() {
518 map.insert(segments[0].to_string(), value);
519 }
520 return;
521 }
522
523 if !node.is_object() {
524 *node = JsonValue::Object(Map::new());
525 }
526
527 if let Some(map) = node.as_object_mut() {
528 let child = map
529 .entry(segments[0].to_string())
530 .or_insert_with(|| JsonValue::Object(Map::new()));
531 set_path_recursive(child, &segments[1..], value);
532 }
533}
534
535fn core_arg_text(args: &JsonValue, key: &str) -> String {
536 args.get(key)
537 .and_then(|v| v.as_str())
538 .map(ToOwned::to_owned)
539 .or_else(|| args.get("__input").and_then(|v| v.as_str()).map(ToOwned::to_owned))
540 .unwrap_or_default()
541}
542
543 fn core_arg_message(args: &JsonValue, key: &str) -> String {
544 args.get(key)
545 .map(core_scalar_to_key)
546 .unwrap_or_default()
547 }
548
549fn core_arg_f64(args: &JsonValue, key: &str) -> Option<f64> {
550 args.get(key)
551 .and_then(|v| v.as_f64().or_else(|| v.as_str().and_then(|s| s.parse::<f64>().ok())))
552}
553
554fn compare_numeric(args: &JsonValue, pred: fn(f64, f64) -> bool) -> JsonValue {
555 let Some(a) = core_arg_f64(args, "a") else {
556 return json!({ "error": "missing numeric arg: a" });
557 };
558 let Some(b) = core_arg_f64(args, "b") else {
559 return json!({ "error": "missing numeric arg: b" });
560 };
561 json!({ "value": pred(a, b) })
562}
563
564fn bump_field(args: &JsonValue, delta: f64) -> JsonValue {
565 let field = args.get("field").and_then(|v| v.as_str()).unwrap_or("");
566 if field.is_empty() {
567 return json!({ "error": "missing arg: field" });
568 }
569
570 let mut input = args
571 .get("input")
572 .and_then(|v| v.as_object())
573 .cloned()
574 .or_else(|| args.get("__input").and_then(|v| v.as_object()).cloned())
575 .unwrap_or_default();
576
577 let current = input.get(field).and_then(|v| v.as_f64()).unwrap_or(0.0);
578 input.insert(field.to_string(), json!(current + delta));
579 JsonValue::Object(input)
580}
581
582#[cfg(test)]
583mod tests {
584 use super::*;
585
586 #[test]
587 fn echo_accepts_numeric_message_values() {
588 let out = echo(&json!({ "message": 42 }));
589 assert_eq!(out, json!({ "message": "42" }));
590 }
591
592 #[test]
593 fn echo_accepts_object_message_values() {
594 let out = echo(&json!({ "message": { "a": 1, "b": true } }));
595 assert_eq!(out, json!({ "message": "{\"a\":1,\"b\":true}" }));
596 }
597
598 #[test]
599 fn tap_preserves_input_when_present() {
600 let out = tap(&json!({ "__input": { "value": 1 }, "message": "ignored" }));
601 assert_eq!(out, json!({ "value": 1 }));
602 }
603}