Skip to main content

a3s_flow/nodes/
variable_aggregator.rs

1//! Built-in `"variable-aggregator"` node — collects outputs from multiple
2//! upstream branches and returns the first non-null value.
3//!
4//! Mirrors Dify's Variable Aggregator node. Typical use: merge the outputs
5//! of an `"if-else"` fan-out back into a single value for downstream nodes.
6//!
7//! # Config schema
8//!
9//! ```json
10//! {
11//!   "inputs": ["branch_ok", "branch_error"]
12//! }
13//! ```
14//!
15//! | Field | Type | Description |
16//! |-------|------|-------------|
17//! | `inputs` | string[] | Optional. Try these node IDs in order. If omitted, all inputs are tried in alphabetical key order. |
18//!
19//! # Output schema
20//!
21//! ```json
22//! { "output": { "body": "..." } }
23//! ```
24//!
25//! Returns `{ "output": null }` when all upstream values are `null` (all
26//! branches were skipped).
27//!
28//! # Example
29//!
30//! ```json
31//! {
32//!   "nodes": [
33//!     { "id": "route",  "type": "if-else", "data": { ... } },
34//!     { "id": "path_a", "type": "http-request", "data": { "run_if": { ... } } },
35//!     { "id": "path_b", "type": "http-request", "data": { "run_if": { ... } } },
36//!     { "id": "merge",  "type": "variable-aggregator", "data": { "inputs": ["path_a", "path_b"] } }
37//!   ],
38//!   "edges": [
39//!     { "source": "route",  "target": "path_a" },
40//!     { "source": "route",  "target": "path_b" },
41//!     { "source": "path_a", "target": "merge" },
42//!     { "source": "path_b", "target": "merge" }
43//!   ]
44//! }
45//! ```
46
47use async_trait::async_trait;
48use serde_json::{json, Value};
49
50use crate::error::Result;
51use crate::node::{ExecContext, Node};
52
53/// Variable aggregator node (Dify-compatible).
54pub struct VariableAggregatorNode;
55
56#[async_trait]
57impl Node for VariableAggregatorNode {
58    fn node_type(&self) -> &str {
59        "variable-aggregator"
60    }
61
62    async fn execute(&self, ctx: ExecContext) -> Result<Value> {
63        // Determine the order to try inputs.
64        let order: Option<Vec<String>> = ctx
65            .data
66            .get("inputs")
67            .and_then(|v| serde_json::from_value(v.clone()).ok());
68
69        let first_non_null = if let Some(keys) = order {
70            keys.iter()
71                .find_map(|k| ctx.inputs.get(k).filter(|v| !v.is_null()))
72                .cloned()
73        } else {
74            // Alphabetical order for determinism when no explicit order given.
75            let mut keys: Vec<&String> = ctx.inputs.keys().collect();
76            keys.sort();
77            keys.into_iter()
78                .find_map(|k| ctx.inputs.get(k).filter(|v| !v.is_null()))
79                .cloned()
80        };
81
82        Ok(json!({ "output": first_non_null }))
83    }
84}
85
86#[cfg(test)]
87mod tests {
88    use super::*;
89    use std::collections::HashMap;
90
91    fn ctx(inputs: HashMap<String, Value>, data: Value) -> ExecContext {
92        ExecContext {
93            data,
94            inputs,
95            variables: HashMap::new(),
96            ..Default::default()
97        }
98    }
99
100    #[tokio::test]
101    async fn returns_first_non_null_in_explicit_order() {
102        let node = VariableAggregatorNode;
103        let out = node
104            .execute(ctx(
105                HashMap::from([
106                    ("a".into(), json!(null)),
107                    ("b".into(), json!({ "v": 42 })),
108                    ("c".into(), json!({ "v": 99 })),
109                ]),
110                json!({ "inputs": ["a", "b", "c"] }),
111            ))
112            .await
113            .unwrap();
114        assert_eq!(out["output"]["v"], 42);
115    }
116
117    #[tokio::test]
118    async fn returns_null_when_all_skipped() {
119        let node = VariableAggregatorNode;
120        let out = node
121            .execute(ctx(
122                HashMap::from([("a".into(), json!(null)), ("b".into(), json!(null))]),
123                json!({ "inputs": ["a", "b"] }),
124            ))
125            .await
126            .unwrap();
127        assert!(out["output"].is_null());
128    }
129
130    #[tokio::test]
131    async fn alphabetical_order_when_no_config() {
132        let node = VariableAggregatorNode;
133        let out = node
134            .execute(ctx(
135                HashMap::from([
136                    ("z".into(), json!("last")),
137                    ("a".into(), json!(null)),
138                    ("m".into(), json!("first_non_null")),
139                ]),
140                json!({}),
141            ))
142            .await
143            .unwrap();
144        assert_eq!(out["output"], "first_non_null");
145    }
146}