Skip to main content

a3s_flow/nodes/
list_operator.rs

1//! `"list-operator"` node — pure JSON array operations.
2//!
3//! Applies a configurable pipeline of operations to an input array in a fixed
4//! order: **filter → sort → deduplicate → limit**. Every operation is optional.
5//!
6//! All operations are pure in-process JSON logic — no LLM or network calls.
7//!
8//! # Config schema
9//!
10//! ```json
11//! {
12//!   "input_selector": "fetch.body.items",
13//!   "filter":          { "path": "active", "op": "eq", "value": true },
14//!   "sort_by":         "name",
15//!   "sort_order":      "asc",
16//!   "deduplicate_by":  "id",
17//!   "limit":           10
18//! }
19//! ```
20//!
21//! | Field | Type | Required | Description |
22//! |-------|------|:--------:|-------------|
23//! | `input_selector` | string | ✅ | Dot path into upstream inputs: `"node_id"` or `"node_id.field.subfield"` |
24//! | `filter` | object | — | Keep items where condition is true. Fields: `path` (dot path into each element), `op` (`"eq"`, `"ne"`, `"gt"`, `"lt"`, `"gte"`, `"lte"`, `"contains"`), `value` |
25//! | `sort_by` | string | — | Dot path into each element to sort by. Numbers and strings are both supported; nulls sort last |
26//! | `sort_order` | string | — | `"asc"` (default) or `"desc"` |
27//! | `deduplicate_by` | string | — | Dot path into each element; keeps the first occurrence of each unique value at that path. Empty string deduplicates by full element equality |
28//! | `limit` | integer | — | Keep only the first N elements (applied last) |
29//!
30//! Operations are applied in this fixed order: filter → sort → deduplicate → limit.
31//!
32//! # Output schema
33//!
34//! ```json
35//! { "output": [ ... ] }
36//! ```
37//!
38//! # Example
39//!
40//! ```json
41//! {
42//!   "id": "clean",
43//!   "type": "list-operator",
44//!   "data": {
45//!     "input_selector":  "fetch.body.users",
46//!     "filter":          { "path": "active", "op": "eq", "value": true },
47//!     "sort_by":         "name",
48//!     "sort_order":      "asc",
49//!     "deduplicate_by":  "email",
50//!     "limit":           100
51//!   }
52//! }
53//! ```
54
55use std::collections::{HashMap, HashSet};
56
57use async_trait::async_trait;
58use serde_json::{json, Value};
59
60use crate::condition::{get_path, CondOp};
61use crate::error::{FlowError, Result};
62use crate::node::{ExecContext, Node};
63
64/// List operator node — filter / sort / deduplicate / limit a JSON array.
65pub struct ListOperatorNode;
66
67#[async_trait]
68impl Node for ListOperatorNode {
69    fn node_type(&self) -> &str {
70        "list-operator"
71    }
72
73    async fn execute(&self, ctx: ExecContext) -> Result<Value> {
74        // ── Resolve input array ───────────────────────────────────────────
75        let input_selector = ctx.data["input_selector"].as_str().ok_or_else(|| {
76            FlowError::InvalidDefinition("list-operator: missing data.input_selector".into())
77        })?;
78
79        let mut items = resolve_input_array(&ctx.inputs, input_selector)?;
80
81        // ── Apply operations in fixed order ───────────────────────────────
82        if !ctx.data["filter"].is_null() {
83            items = apply_filter(items, &ctx.data["filter"])?;
84        }
85        if let Some(key) = ctx.data["sort_by"].as_str() {
86            let order = ctx.data["sort_order"].as_str().unwrap_or("asc");
87            items = apply_sort(items, key, order);
88        }
89        if let Some(key) = ctx.data["deduplicate_by"].as_str() {
90            items = apply_deduplicate(items, key);
91        }
92        if let Some(n) = ctx.data["limit"].as_u64() {
93            items.truncate(n as usize);
94        }
95
96        Ok(json!({ "output": items }))
97    }
98}
99
100// ── Operation implementations ──────────────────────────────────────────────
101
102/// Resolve `"node_id"` or `"node_id.field.subfield"` from upstream inputs.
103fn resolve_input_array(inputs: &HashMap<String, Value>, selector: &str) -> Result<Vec<Value>> {
104    let (node_id, rest) = match selector.find('.') {
105        Some(pos) => (&selector[..pos], &selector[pos + 1..]),
106        None => (selector, ""),
107    };
108
109    let node_out = inputs.get(node_id).ok_or_else(|| {
110        FlowError::InvalidDefinition(format!(
111            "list-operator: input_selector '{selector}' references unknown node '{node_id}'"
112        ))
113    })?;
114
115    let value = if rest.is_empty() {
116        node_out
117    } else {
118        get_path(node_out, rest).ok_or_else(|| {
119            FlowError::InvalidDefinition(format!(
120                "list-operator: path '{rest}' not found in node '{node_id}' output"
121            ))
122        })?
123    };
124
125    value
126        .as_array()
127        .ok_or_else(|| {
128            FlowError::InvalidDefinition(format!(
129                "list-operator: input_selector '{selector}' must point to a JSON array"
130            ))
131        })
132        .cloned()
133}
134
135/// Keep elements where the filter condition evaluates to true.
136fn apply_filter(items: Vec<Value>, filter: &Value) -> Result<Vec<Value>> {
137    let path = filter["path"].as_str().unwrap_or("");
138    let op: CondOp = serde_json::from_value(filter["op"].clone()).map_err(|e| {
139        FlowError::InvalidDefinition(format!("list-operator: invalid filter.op: {e}"))
140    })?;
141    let expected = &filter["value"];
142
143    let mut out = Vec::with_capacity(items.len());
144    for item in items {
145        let actual = if path.is_empty() {
146            &item
147        } else {
148            match get_path(&item, path) {
149                Some(v) => v,
150                None => continue, // path missing → filter out
151            }
152        };
153        if compare_values(actual, &op, expected) {
154            out.push(item);
155        }
156    }
157    Ok(out)
158}
159
160/// Sort elements by a dot-path field.
161///
162/// Numbers compare numerically, strings lexicographically, nulls sort last.
163/// Mixed types (number vs string) sort numbers before strings.
164fn apply_sort(mut items: Vec<Value>, key: &str, order: &str) -> Vec<Value> {
165    let descending = order == "desc";
166    items.sort_by(|a, b| {
167        let av = if key.is_empty() {
168            Some(a)
169        } else {
170            get_path(a, key)
171        };
172        let bv = if key.is_empty() {
173            Some(b)
174        } else {
175            get_path(b, key)
176        };
177        let ord = compare_sort_values(av, bv);
178        if descending {
179            ord.reverse()
180        } else {
181            ord
182        }
183    });
184    items
185}
186
187/// Remove duplicate elements by a dot-path key (first occurrence wins).
188///
189/// Empty key deduplicates by full element equality (serialized to JSON string).
190fn apply_deduplicate(items: Vec<Value>, key: &str) -> Vec<Value> {
191    let mut seen: HashSet<String> = HashSet::new();
192    let mut out = Vec::with_capacity(items.len());
193    for item in items {
194        let fingerprint = if key.is_empty() {
195            item.to_string()
196        } else {
197            get_path(&item, key)
198                .map(|v| v.to_string())
199                .unwrap_or_default()
200        };
201        if seen.insert(fingerprint) {
202            out.push(item);
203        }
204    }
205    out
206}
207
208// ── Comparison helpers ─────────────────────────────────────────────────────
209
210fn compare_values(actual: &Value, op: &CondOp, expected: &Value) -> bool {
211    match op {
212        CondOp::Eq => actual == expected,
213        CondOp::Ne => actual != expected,
214        CondOp::Gt => numeric_cmp(actual, expected)
215            .map(|o| o.is_gt())
216            .unwrap_or(false),
217        CondOp::Lt => numeric_cmp(actual, expected)
218            .map(|o| o.is_lt())
219            .unwrap_or(false),
220        CondOp::Gte => numeric_cmp(actual, expected)
221            .map(|o| o.is_ge())
222            .unwrap_or(false),
223        CondOp::Lte => numeric_cmp(actual, expected)
224            .map(|o| o.is_le())
225            .unwrap_or(false),
226        CondOp::Contains => match (actual, expected) {
227            (Value::String(s), Value::String(sub)) => s.contains(sub.as_str()),
228            (Value::Array(arr), v) => arr.contains(v),
229            _ => false,
230        },
231    }
232}
233
234fn numeric_cmp(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
235    a.as_f64()?.partial_cmp(&b.as_f64()?)
236}
237
238fn compare_sort_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
239    use std::cmp::Ordering;
240    match (a, b) {
241        (None, None) => Ordering::Equal,
242        (None, Some(_)) => Ordering::Greater, // null sorts last
243        (Some(_), None) => Ordering::Less,
244        (Some(av), Some(bv)) => {
245            // Both numeric → numeric comparison.
246            if let (Some(an), Some(bn)) = (av.as_f64(), bv.as_f64()) {
247                return an.partial_cmp(&bn).unwrap_or(Ordering::Equal);
248            }
249            // Both strings → lexicographic.
250            if let (Some(as_), Some(bs)) = (av.as_str(), bv.as_str()) {
251                return as_.cmp(bs);
252            }
253            // Mixed / other → fall back to string representation.
254            av.to_string().cmp(&bv.to_string())
255        }
256    }
257}
258
259// ── Tests ─────────────────────────────────────────────────────────────────────
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264    use serde_json::json;
265
266    fn ctx_with(data: Value, input_node: &str, array: Value) -> ExecContext {
267        ExecContext {
268            data,
269            inputs: HashMap::from([(input_node.to_string(), array)]),
270            ..Default::default()
271        }
272    }
273
274    // ── input resolution ───────────────────────────────────────────────────
275
276    #[tokio::test]
277    async fn resolves_root_array() {
278        let node = ListOperatorNode;
279        let out = node
280            .execute(ctx_with(
281                json!({ "input_selector": "src" }),
282                "src",
283                json!([1, 2, 3]),
284            ))
285            .await
286            .unwrap();
287        assert_eq!(out["output"], json!([1, 2, 3]));
288    }
289
290    #[tokio::test]
291    async fn resolves_nested_array() {
292        let node = ListOperatorNode;
293        let out = node
294            .execute(ctx_with(
295                json!({ "input_selector": "src.items" }),
296                "src",
297                json!({ "items": [4, 5, 6] }),
298            ))
299            .await
300            .unwrap();
301        assert_eq!(out["output"], json!([4, 5, 6]));
302    }
303
304    #[tokio::test]
305    async fn rejects_missing_input_selector() {
306        let node = ListOperatorNode;
307        let err = node
308            .execute(ExecContext {
309                data: json!({}),
310                ..Default::default()
311            })
312            .await
313            .unwrap_err();
314        assert!(matches!(err, FlowError::InvalidDefinition(_)));
315    }
316
317    #[tokio::test]
318    async fn rejects_non_array_input() {
319        let node = ListOperatorNode;
320        let err = node
321            .execute(ctx_with(
322                json!({ "input_selector": "src" }),
323                "src",
324                json!("not an array"),
325            ))
326            .await
327            .unwrap_err();
328        assert!(matches!(err, FlowError::InvalidDefinition(_)));
329    }
330
331    // ── filter ─────────────────────────────────────────────────────────────
332
333    #[tokio::test]
334    async fn filter_eq_keeps_matching_items() {
335        let node = ListOperatorNode;
336        let out = node
337            .execute(ctx_with(
338                json!({
339                    "input_selector": "src",
340                    "filter": { "path": "active", "op": "eq", "value": true }
341                }),
342                "src",
343                json!([
344                    { "name": "Alice", "active": true },
345                    { "name": "Bob",   "active": false },
346                    { "name": "Carol", "active": true }
347                ]),
348            ))
349            .await
350            .unwrap();
351        let arr = out["output"].as_array().unwrap();
352        assert_eq!(arr.len(), 2);
353        assert_eq!(arr[0]["name"], json!("Alice"));
354        assert_eq!(arr[1]["name"], json!("Carol"));
355    }
356
357    #[tokio::test]
358    async fn filter_gt_keeps_numeric_matches() {
359        let node = ListOperatorNode;
360        let out = node
361            .execute(ctx_with(
362                json!({
363                    "input_selector": "src",
364                    "filter": { "path": "score", "op": "gt", "value": 5 }
365                }),
366                "src",
367                json!([
368                    { "score": 3 },
369                    { "score": 7 },
370                    { "score": 10 }
371                ]),
372            ))
373            .await
374            .unwrap();
375        let arr = out["output"].as_array().unwrap();
376        assert_eq!(arr.len(), 2);
377    }
378
379    #[tokio::test]
380    async fn filter_contains_string() {
381        let node = ListOperatorNode;
382        let out = node
383            .execute(ctx_with(
384                json!({
385                    "input_selector": "src",
386                    "filter": { "path": "tag", "op": "contains", "value": "rust" }
387                }),
388                "src",
389                json!([
390                    { "tag": "rust-2024" },
391                    { "tag": "python" },
392                    { "tag": "rust-async" }
393                ]),
394            ))
395            .await
396            .unwrap();
397        assert_eq!(out["output"].as_array().unwrap().len(), 2);
398    }
399
400    #[tokio::test]
401    async fn filter_on_missing_path_excludes_item() {
402        let node = ListOperatorNode;
403        let out = node
404            .execute(ctx_with(
405                json!({
406                    "input_selector": "src",
407                    "filter": { "path": "missing_field", "op": "eq", "value": true }
408                }),
409                "src",
410                json!([{ "x": 1 }, { "x": 2 }]),
411            ))
412            .await
413            .unwrap();
414        assert_eq!(out["output"], json!([]));
415    }
416
417    // ── sort ───────────────────────────────────────────────────────────────
418
419    #[tokio::test]
420    async fn sort_strings_ascending() {
421        let node = ListOperatorNode;
422        let out = node
423            .execute(ctx_with(
424                json!({ "input_selector": "src", "sort_by": "name" }),
425                "src",
426                json!([{ "name": "Charlie" }, { "name": "Alice" }, { "name": "Bob" }]),
427            ))
428            .await
429            .unwrap();
430        let names: Vec<_> = out["output"]
431            .as_array()
432            .unwrap()
433            .iter()
434            .map(|v| v["name"].as_str().unwrap())
435            .collect();
436        assert_eq!(names, ["Alice", "Bob", "Charlie"]);
437    }
438
439    #[tokio::test]
440    async fn sort_numbers_descending() {
441        let node = ListOperatorNode;
442        let out = node
443            .execute(ctx_with(
444                json!({ "input_selector": "src", "sort_by": "score", "sort_order": "desc" }),
445                "src",
446                json!([{ "score": 3 }, { "score": 9 }, { "score": 1 }]),
447            ))
448            .await
449            .unwrap();
450        let scores: Vec<_> = out["output"]
451            .as_array()
452            .unwrap()
453            .iter()
454            .map(|v| v["score"].as_i64().unwrap())
455            .collect();
456        assert_eq!(scores, [9, 3, 1]);
457    }
458
459    #[tokio::test]
460    async fn sort_null_values_sort_last() {
461        let node = ListOperatorNode;
462        let out = node
463            .execute(ctx_with(
464                json!({ "input_selector": "src", "sort_by": "x" }),
465                "src",
466                json!([{ "x": 3 }, {}, { "x": 1 }]),
467            ))
468            .await
469            .unwrap();
470        let arr = out["output"].as_array().unwrap();
471        assert_eq!(arr[0]["x"], json!(1));
472        assert_eq!(arr[1]["x"], json!(3));
473        assert!(arr[2].get("x").is_none()); // null-keyed item last
474    }
475
476    // ── deduplicate ────────────────────────────────────────────────────────
477
478    #[tokio::test]
479    async fn deduplicate_by_field_keeps_first() {
480        let node = ListOperatorNode;
481        let out = node
482            .execute(ctx_with(
483                json!({ "input_selector": "src", "deduplicate_by": "id" }),
484                "src",
485                json!([
486                    { "id": 1, "v": "a" },
487                    { "id": 2, "v": "b" },
488                    { "id": 1, "v": "c" }  // duplicate id:1 — dropped
489                ]),
490            ))
491            .await
492            .unwrap();
493        let arr = out["output"].as_array().unwrap();
494        assert_eq!(arr.len(), 2);
495        assert_eq!(arr[0]["v"], json!("a")); // first occurrence kept
496    }
497
498    #[tokio::test]
499    async fn deduplicate_empty_key_uses_full_equality() {
500        let node = ListOperatorNode;
501        let out = node
502            .execute(ctx_with(
503                json!({ "input_selector": "src", "deduplicate_by": "" }),
504                "src",
505                json!([1, 2, 1, 3, 2]),
506            ))
507            .await
508            .unwrap();
509        let arr = out["output"].as_array().unwrap();
510        assert_eq!(arr.len(), 3);
511    }
512
513    // ── limit ──────────────────────────────────────────────────────────────
514
515    #[tokio::test]
516    async fn limit_truncates_to_n() {
517        let node = ListOperatorNode;
518        let out = node
519            .execute(ctx_with(
520                json!({ "input_selector": "src", "limit": 2 }),
521                "src",
522                json!([10, 20, 30, 40]),
523            ))
524            .await
525            .unwrap();
526        assert_eq!(out["output"], json!([10, 20]));
527    }
528
529    #[tokio::test]
530    async fn limit_larger_than_array_keeps_all() {
531        let node = ListOperatorNode;
532        let out = node
533            .execute(ctx_with(
534                json!({ "input_selector": "src", "limit": 100 }),
535                "src",
536                json!([1, 2]),
537            ))
538            .await
539            .unwrap();
540        assert_eq!(out["output"], json!([1, 2]));
541    }
542
543    // ── combined pipeline ──────────────────────────────────────────────────
544
545    #[tokio::test]
546    async fn filter_sort_limit_combined() {
547        let node = ListOperatorNode;
548        let out = node
549            .execute(ctx_with(
550                json!({
551                    "input_selector": "src",
552                    "filter": { "path": "active", "op": "eq", "value": true },
553                    "sort_by": "score",
554                    "sort_order": "desc",
555                    "limit": 2
556                }),
557                "src",
558                json!([
559                    { "name": "A", "score": 5,  "active": true  },
560                    { "name": "B", "score": 10, "active": false },
561                    { "name": "C", "score": 8,  "active": true  },
562                    { "name": "D", "score": 3,  "active": true  },
563                    { "name": "E", "score": 12, "active": true  }
564                ]),
565            ))
566            .await
567            .unwrap();
568        let arr = out["output"].as_array().unwrap();
569        // Active: A(5), C(8), D(3), E(12). Sorted desc: E(12), C(8), A(5), D(3). Limit 2: E, C.
570        assert_eq!(arr.len(), 2);
571        assert_eq!(arr[0]["name"], json!("E"));
572        assert_eq!(arr[1]["name"], json!("C"));
573    }
574}