1use std::collections::{HashMap, HashSet};
56
57use async_trait::async_trait;
58use serde_json::{json, Value};
59
60use crate::condition::{get_path, CondOp};
61use crate::error::{FlowError, Result};
62use crate::node::{ExecContext, Node};
63
64pub struct ListOperatorNode;
66
67#[async_trait]
68impl Node for ListOperatorNode {
69 fn node_type(&self) -> &str {
70 "list-operator"
71 }
72
73 async fn execute(&self, ctx: ExecContext) -> Result<Value> {
74 let input_selector = ctx.data["input_selector"].as_str().ok_or_else(|| {
76 FlowError::InvalidDefinition("list-operator: missing data.input_selector".into())
77 })?;
78
79 let mut items = resolve_input_array(&ctx.inputs, input_selector)?;
80
81 if !ctx.data["filter"].is_null() {
83 items = apply_filter(items, &ctx.data["filter"])?;
84 }
85 if let Some(key) = ctx.data["sort_by"].as_str() {
86 let order = ctx.data["sort_order"].as_str().unwrap_or("asc");
87 items = apply_sort(items, key, order);
88 }
89 if let Some(key) = ctx.data["deduplicate_by"].as_str() {
90 items = apply_deduplicate(items, key);
91 }
92 if let Some(n) = ctx.data["limit"].as_u64() {
93 items.truncate(n as usize);
94 }
95
96 Ok(json!({ "output": items }))
97 }
98}
99
100fn resolve_input_array(inputs: &HashMap<String, Value>, selector: &str) -> Result<Vec<Value>> {
104 let (node_id, rest) = match selector.find('.') {
105 Some(pos) => (&selector[..pos], &selector[pos + 1..]),
106 None => (selector, ""),
107 };
108
109 let node_out = inputs.get(node_id).ok_or_else(|| {
110 FlowError::InvalidDefinition(format!(
111 "list-operator: input_selector '{selector}' references unknown node '{node_id}'"
112 ))
113 })?;
114
115 let value = if rest.is_empty() {
116 node_out
117 } else {
118 get_path(node_out, rest).ok_or_else(|| {
119 FlowError::InvalidDefinition(format!(
120 "list-operator: path '{rest}' not found in node '{node_id}' output"
121 ))
122 })?
123 };
124
125 value
126 .as_array()
127 .ok_or_else(|| {
128 FlowError::InvalidDefinition(format!(
129 "list-operator: input_selector '{selector}' must point to a JSON array"
130 ))
131 })
132 .map(|a| a.clone())
133}
134
135fn apply_filter(items: Vec<Value>, filter: &Value) -> Result<Vec<Value>> {
137 let path = filter["path"].as_str().unwrap_or("");
138 let op: CondOp = serde_json::from_value(filter["op"].clone()).map_err(|e| {
139 FlowError::InvalidDefinition(format!("list-operator: invalid filter.op: {e}"))
140 })?;
141 let expected = &filter["value"];
142
143 let mut out = Vec::with_capacity(items.len());
144 for item in items {
145 let actual = if path.is_empty() {
146 &item
147 } else {
148 match get_path(&item, path) {
149 Some(v) => v,
150 None => continue, }
152 };
153 if compare_values(actual, &op, expected) {
154 out.push(item);
155 }
156 }
157 Ok(out)
158}
159
160fn apply_sort(mut items: Vec<Value>, key: &str, order: &str) -> Vec<Value> {
165 let descending = order == "desc";
166 items.sort_by(|a, b| {
167 let av = if key.is_empty() {
168 Some(a)
169 } else {
170 get_path(a, key)
171 };
172 let bv = if key.is_empty() {
173 Some(b)
174 } else {
175 get_path(b, key)
176 };
177 let ord = compare_sort_values(av, bv);
178 if descending {
179 ord.reverse()
180 } else {
181 ord
182 }
183 });
184 items
185}
186
187fn apply_deduplicate(items: Vec<Value>, key: &str) -> Vec<Value> {
191 let mut seen: HashSet<String> = HashSet::new();
192 let mut out = Vec::with_capacity(items.len());
193 for item in items {
194 let fingerprint = if key.is_empty() {
195 item.to_string()
196 } else {
197 get_path(&item, key)
198 .map(|v| v.to_string())
199 .unwrap_or_default()
200 };
201 if seen.insert(fingerprint) {
202 out.push(item);
203 }
204 }
205 out
206}
207
208fn compare_values(actual: &Value, op: &CondOp, expected: &Value) -> bool {
211 match op {
212 CondOp::Eq => actual == expected,
213 CondOp::Ne => actual != expected,
214 CondOp::Gt => numeric_cmp(actual, expected)
215 .map(|o| o.is_gt())
216 .unwrap_or(false),
217 CondOp::Lt => numeric_cmp(actual, expected)
218 .map(|o| o.is_lt())
219 .unwrap_or(false),
220 CondOp::Gte => numeric_cmp(actual, expected)
221 .map(|o| o.is_ge())
222 .unwrap_or(false),
223 CondOp::Lte => numeric_cmp(actual, expected)
224 .map(|o| o.is_le())
225 .unwrap_or(false),
226 CondOp::Contains => match (actual, expected) {
227 (Value::String(s), Value::String(sub)) => s.contains(sub.as_str()),
228 (Value::Array(arr), v) => arr.contains(v),
229 _ => false,
230 },
231 }
232}
233
234fn numeric_cmp(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
235 a.as_f64()?.partial_cmp(&b.as_f64()?)
236}
237
238fn compare_sort_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
239 use std::cmp::Ordering;
240 match (a, b) {
241 (None, None) => Ordering::Equal,
242 (None, Some(_)) => Ordering::Greater, (Some(_), None) => Ordering::Less,
244 (Some(av), Some(bv)) => {
245 if let (Some(an), Some(bn)) = (av.as_f64(), bv.as_f64()) {
247 return an.partial_cmp(&bn).unwrap_or(Ordering::Equal);
248 }
249 if let (Some(as_), Some(bs)) = (av.as_str(), bv.as_str()) {
251 return as_.cmp(bs);
252 }
253 av.to_string().cmp(&bv.to_string())
255 }
256 }
257}
258
259#[cfg(test)]
262mod tests {
263 use super::*;
264 use serde_json::json;
265
266 fn ctx_with(data: Value, input_node: &str, array: Value) -> ExecContext {
267 ExecContext {
268 data,
269 inputs: HashMap::from([(input_node.to_string(), array)]),
270 ..Default::default()
271 }
272 }
273
274 #[tokio::test]
277 async fn resolves_root_array() {
278 let node = ListOperatorNode;
279 let out = node
280 .execute(ctx_with(
281 json!({ "input_selector": "src" }),
282 "src",
283 json!([1, 2, 3]),
284 ))
285 .await
286 .unwrap();
287 assert_eq!(out["output"], json!([1, 2, 3]));
288 }
289
290 #[tokio::test]
291 async fn resolves_nested_array() {
292 let node = ListOperatorNode;
293 let out = node
294 .execute(ctx_with(
295 json!({ "input_selector": "src.items" }),
296 "src",
297 json!({ "items": [4, 5, 6] }),
298 ))
299 .await
300 .unwrap();
301 assert_eq!(out["output"], json!([4, 5, 6]));
302 }
303
304 #[tokio::test]
305 async fn rejects_missing_input_selector() {
306 let node = ListOperatorNode;
307 let err = node
308 .execute(ExecContext {
309 data: json!({}),
310 ..Default::default()
311 })
312 .await
313 .unwrap_err();
314 assert!(matches!(err, FlowError::InvalidDefinition(_)));
315 }
316
317 #[tokio::test]
318 async fn rejects_non_array_input() {
319 let node = ListOperatorNode;
320 let err = node
321 .execute(ctx_with(
322 json!({ "input_selector": "src" }),
323 "src",
324 json!("not an array"),
325 ))
326 .await
327 .unwrap_err();
328 assert!(matches!(err, FlowError::InvalidDefinition(_)));
329 }
330
331 #[tokio::test]
334 async fn filter_eq_keeps_matching_items() {
335 let node = ListOperatorNode;
336 let out = node
337 .execute(ctx_with(
338 json!({
339 "input_selector": "src",
340 "filter": { "path": "active", "op": "eq", "value": true }
341 }),
342 "src",
343 json!([
344 { "name": "Alice", "active": true },
345 { "name": "Bob", "active": false },
346 { "name": "Carol", "active": true }
347 ]),
348 ))
349 .await
350 .unwrap();
351 let arr = out["output"].as_array().unwrap();
352 assert_eq!(arr.len(), 2);
353 assert_eq!(arr[0]["name"], json!("Alice"));
354 assert_eq!(arr[1]["name"], json!("Carol"));
355 }
356
357 #[tokio::test]
358 async fn filter_gt_keeps_numeric_matches() {
359 let node = ListOperatorNode;
360 let out = node
361 .execute(ctx_with(
362 json!({
363 "input_selector": "src",
364 "filter": { "path": "score", "op": "gt", "value": 5 }
365 }),
366 "src",
367 json!([
368 { "score": 3 },
369 { "score": 7 },
370 { "score": 10 }
371 ]),
372 ))
373 .await
374 .unwrap();
375 let arr = out["output"].as_array().unwrap();
376 assert_eq!(arr.len(), 2);
377 }
378
379 #[tokio::test]
380 async fn filter_contains_string() {
381 let node = ListOperatorNode;
382 let out = node
383 .execute(ctx_with(
384 json!({
385 "input_selector": "src",
386 "filter": { "path": "tag", "op": "contains", "value": "rust" }
387 }),
388 "src",
389 json!([
390 { "tag": "rust-2024" },
391 { "tag": "python" },
392 { "tag": "rust-async" }
393 ]),
394 ))
395 .await
396 .unwrap();
397 assert_eq!(out["output"].as_array().unwrap().len(), 2);
398 }
399
400 #[tokio::test]
401 async fn filter_on_missing_path_excludes_item() {
402 let node = ListOperatorNode;
403 let out = node
404 .execute(ctx_with(
405 json!({
406 "input_selector": "src",
407 "filter": { "path": "missing_field", "op": "eq", "value": true }
408 }),
409 "src",
410 json!([{ "x": 1 }, { "x": 2 }]),
411 ))
412 .await
413 .unwrap();
414 assert_eq!(out["output"], json!([]));
415 }
416
417 #[tokio::test]
420 async fn sort_strings_ascending() {
421 let node = ListOperatorNode;
422 let out = node
423 .execute(ctx_with(
424 json!({ "input_selector": "src", "sort_by": "name" }),
425 "src",
426 json!([{ "name": "Charlie" }, { "name": "Alice" }, { "name": "Bob" }]),
427 ))
428 .await
429 .unwrap();
430 let names: Vec<_> = out["output"]
431 .as_array()
432 .unwrap()
433 .iter()
434 .map(|v| v["name"].as_str().unwrap())
435 .collect();
436 assert_eq!(names, ["Alice", "Bob", "Charlie"]);
437 }
438
439 #[tokio::test]
440 async fn sort_numbers_descending() {
441 let node = ListOperatorNode;
442 let out = node
443 .execute(ctx_with(
444 json!({ "input_selector": "src", "sort_by": "score", "sort_order": "desc" }),
445 "src",
446 json!([{ "score": 3 }, { "score": 9 }, { "score": 1 }]),
447 ))
448 .await
449 .unwrap();
450 let scores: Vec<_> = out["output"]
451 .as_array()
452 .unwrap()
453 .iter()
454 .map(|v| v["score"].as_i64().unwrap())
455 .collect();
456 assert_eq!(scores, [9, 3, 1]);
457 }
458
459 #[tokio::test]
460 async fn sort_null_values_sort_last() {
461 let node = ListOperatorNode;
462 let out = node
463 .execute(ctx_with(
464 json!({ "input_selector": "src", "sort_by": "x" }),
465 "src",
466 json!([{ "x": 3 }, {}, { "x": 1 }]),
467 ))
468 .await
469 .unwrap();
470 let arr = out["output"].as_array().unwrap();
471 assert_eq!(arr[0]["x"], json!(1));
472 assert_eq!(arr[1]["x"], json!(3));
473 assert!(arr[2].get("x").is_none()); }
475
476 #[tokio::test]
479 async fn deduplicate_by_field_keeps_first() {
480 let node = ListOperatorNode;
481 let out = node
482 .execute(ctx_with(
483 json!({ "input_selector": "src", "deduplicate_by": "id" }),
484 "src",
485 json!([
486 { "id": 1, "v": "a" },
487 { "id": 2, "v": "b" },
488 { "id": 1, "v": "c" } ]),
490 ))
491 .await
492 .unwrap();
493 let arr = out["output"].as_array().unwrap();
494 assert_eq!(arr.len(), 2);
495 assert_eq!(arr[0]["v"], json!("a")); }
497
498 #[tokio::test]
499 async fn deduplicate_empty_key_uses_full_equality() {
500 let node = ListOperatorNode;
501 let out = node
502 .execute(ctx_with(
503 json!({ "input_selector": "src", "deduplicate_by": "" }),
504 "src",
505 json!([1, 2, 1, 3, 2]),
506 ))
507 .await
508 .unwrap();
509 let arr = out["output"].as_array().unwrap();
510 assert_eq!(arr.len(), 3);
511 }
512
513 #[tokio::test]
516 async fn limit_truncates_to_n() {
517 let node = ListOperatorNode;
518 let out = node
519 .execute(ctx_with(
520 json!({ "input_selector": "src", "limit": 2 }),
521 "src",
522 json!([10, 20, 30, 40]),
523 ))
524 .await
525 .unwrap();
526 assert_eq!(out["output"], json!([10, 20]));
527 }
528
529 #[tokio::test]
530 async fn limit_larger_than_array_keeps_all() {
531 let node = ListOperatorNode;
532 let out = node
533 .execute(ctx_with(
534 json!({ "input_selector": "src", "limit": 100 }),
535 "src",
536 json!([1, 2]),
537 ))
538 .await
539 .unwrap();
540 assert_eq!(out["output"], json!([1, 2]));
541 }
542
543 #[tokio::test]
546 async fn filter_sort_limit_combined() {
547 let node = ListOperatorNode;
548 let out = node
549 .execute(ctx_with(
550 json!({
551 "input_selector": "src",
552 "filter": { "path": "active", "op": "eq", "value": true },
553 "sort_by": "score",
554 "sort_order": "desc",
555 "limit": 2
556 }),
557 "src",
558 json!([
559 { "name": "A", "score": 5, "active": true },
560 { "name": "B", "score": 10, "active": false },
561 { "name": "C", "score": 8, "active": true },
562 { "name": "D", "score": 3, "active": true },
563 { "name": "E", "score": 12, "active": true }
564 ]),
565 ))
566 .await
567 .unwrap();
568 let arr = out["output"].as_array().unwrap();
569 assert_eq!(arr.len(), 2);
571 assert_eq!(arr[0]["name"], json!("E"));
572 assert_eq!(arr[1]["name"], json!("C"));
573 }
574}