a3s_flow/nodes/variable_aggregator.rs
1//! Built-in `"variable-aggregator"` node — collects outputs from multiple
2//! upstream branches and returns the first non-null value.
3//!
4//! Mirrors Dify's Variable Aggregator node. Typical use: merge the outputs
5//! of an `"if-else"` fan-out back into a single value for downstream nodes.
6//!
7//! # Config schema
8//!
9//! ```json
10//! {
11//! "inputs": ["branch_ok", "branch_error"]
12//! }
13//! ```
14//!
15//! | Field | Type | Description |
16//! |-------|------|-------------|
17//! | `inputs` | string[] | Optional. Try these node IDs in order. If omitted, all inputs are tried in alphabetical key order. |
18//!
19//! # Output schema
20//!
21//! ```json
22//! { "output": { "body": "..." } }
23//! ```
24//!
25//! Returns `{ "output": null }` when all upstream values are `null` (all
26//! branches were skipped).
27//!
28//! # Example
29//!
30//! ```json
31//! {
32//! "nodes": [
33//! { "id": "route", "type": "if-else", "data": { ... } },
34//! { "id": "path_a", "type": "http-request", "data": { "run_if": { ... } } },
35//! { "id": "path_b", "type": "http-request", "data": { "run_if": { ... } } },
36//! { "id": "merge", "type": "variable-aggregator", "data": { "inputs": ["path_a", "path_b"] } }
37//! ],
38//! "edges": [
39//! { "source": "route", "target": "path_a" },
40//! { "source": "route", "target": "path_b" },
41//! { "source": "path_a", "target": "merge" },
42//! { "source": "path_b", "target": "merge" }
43//! ]
44//! }
45//! ```
46
47use async_trait::async_trait;
48use serde_json::{json, Value};
49
50use crate::error::Result;
51use crate::node::{ExecContext, Node};
52
53/// Variable aggregator node (Dify-compatible).
54pub struct VariableAggregatorNode;
55
56#[async_trait]
57impl Node for VariableAggregatorNode {
58 fn node_type(&self) -> &str {
59 "variable-aggregator"
60 }
61
62 async fn execute(&self, ctx: ExecContext) -> Result<Value> {
63 // Determine the order to try inputs.
64 let order: Option<Vec<String>> = ctx
65 .data
66 .get("inputs")
67 .and_then(|v| serde_json::from_value(v.clone()).ok());
68
69 let first_non_null = if let Some(keys) = order {
70 keys.iter()
71 .find_map(|k| ctx.inputs.get(k).filter(|v| !v.is_null()))
72 .cloned()
73 } else {
74 // Alphabetical order for determinism when no explicit order given.
75 let mut keys: Vec<&String> = ctx.inputs.keys().collect();
76 keys.sort();
77 keys.into_iter()
78 .find_map(|k| ctx.inputs.get(k).filter(|v| !v.is_null()))
79 .cloned()
80 };
81
82 Ok(json!({ "output": first_non_null }))
83 }
84}
85
86#[cfg(test)]
87mod tests {
88 use super::*;
89 use std::collections::HashMap;
90
91 fn ctx(inputs: HashMap<String, Value>, data: Value) -> ExecContext {
92 ExecContext {
93 data,
94 inputs,
95 variables: HashMap::new(),
96 ..Default::default()
97 }
98 }
99
100 #[tokio::test]
101 async fn returns_first_non_null_in_explicit_order() {
102 let node = VariableAggregatorNode;
103 let out = node
104 .execute(ctx(
105 HashMap::from([
106 ("a".into(), json!(null)),
107 ("b".into(), json!({ "v": 42 })),
108 ("c".into(), json!({ "v": 99 })),
109 ]),
110 json!({ "inputs": ["a", "b", "c"] }),
111 ))
112 .await
113 .unwrap();
114 assert_eq!(out["output"]["v"], 42);
115 }
116
117 #[tokio::test]
118 async fn returns_null_when_all_skipped() {
119 let node = VariableAggregatorNode;
120 let out = node
121 .execute(ctx(
122 HashMap::from([("a".into(), json!(null)), ("b".into(), json!(null))]),
123 json!({ "inputs": ["a", "b"] }),
124 ))
125 .await
126 .unwrap();
127 assert!(out["output"].is_null());
128 }
129
130 #[tokio::test]
131 async fn alphabetical_order_when_no_config() {
132 let node = VariableAggregatorNode;
133 let out = node
134 .execute(ctx(
135 HashMap::from([
136 ("z".into(), json!("last")),
137 ("a".into(), json!(null)),
138 ("m".into(), json!("first_non_null")),
139 ]),
140 json!({}),
141 ))
142 .await
143 .unwrap();
144 assert_eq!(out["output"], "first_non_null");
145 }
146}