a3s_flow/nodes/
subflow.rs1use std::collections::HashMap;
32use std::sync::Arc;
33
34use async_trait::async_trait;
35use serde_json::Value;
36
37use crate::error::{FlowError, Result};
38use crate::graph::DagGraph;
39use crate::node::{ExecContext, Node};
40use crate::runner::FlowRunner;
41
42pub struct SubFlowNode;
43
44#[async_trait]
45impl Node for SubFlowNode {
46 fn node_type(&self) -> &str {
47 "sub-flow"
48 }
49
50 async fn execute(&self, ctx: ExecContext) -> Result<Value> {
51 let name = ctx
52 .data
53 .get("name")
54 .and_then(|v| v.as_str())
55 .ok_or_else(|| {
56 FlowError::InvalidDefinition(
57 "sub-flow node requires a \"name\" string in data".into(),
58 )
59 })?;
60
61 let store = ctx.flow_store.as_ref().ok_or_else(|| {
62 FlowError::Internal(
63 "sub-flow node requires a FlowStore; configure one via \
64 FlowEngine::with_flow_store"
65 .into(),
66 )
67 })?;
68
69 let definition = store
70 .load(name)
71 .await?
72 .ok_or_else(|| FlowError::FlowNotFound(name.to_string()))?;
73
74 let dag = DagGraph::from_json(&definition)?;
75
76 let mut variables: HashMap<String, Value> = ctx.variables.clone();
78 if let Some(overrides) = ctx.data.get("variables").and_then(|v| v.as_object()) {
79 for (k, v) in overrides {
80 variables.insert(k.clone(), v.clone());
81 }
82 }
83
84 let mut runner = FlowRunner::with_arc_registry(dag, Arc::clone(&ctx.registry));
85 if let Some(fs) = ctx.flow_store {
86 runner = runner.with_flow_store(fs);
87 }
88
89 let result = runner.run(variables).await?;
90
91 Ok(Value::Object(
93 result.outputs.into_iter().map(|(k, v)| (k, v)).collect(),
94 ))
95 }
96}
97
98#[cfg(test)]
99mod tests {
100 use super::*;
101 use crate::flow_store::{FlowStore, MemoryFlowStore};
102 use serde_json::json;
103 use std::sync::Arc;
104
105 #[tokio::test]
106 async fn sub_flow_runs_named_flow_and_returns_outputs() {
107 let store = Arc::new(MemoryFlowStore::new());
108 let def = json!({
109 "nodes": [{ "id": "x", "type": "noop" }],
110 "edges": []
111 });
112 store.save("inner", &def).await.unwrap();
113
114 let ctx = ExecContext {
115 data: json!({ "name": "inner" }),
116 flow_store: Some(store),
117 ..Default::default()
118 };
119
120 let node = SubFlowNode;
121 let output = node.execute(ctx).await.unwrap();
122 assert!(output.get("x").is_some());
124 }
125
126 #[tokio::test]
127 async fn sub_flow_missing_name_returns_error() {
128 let ctx = ExecContext {
129 data: json!({}),
130 ..Default::default()
131 };
132 let result = SubFlowNode.execute(ctx).await;
133 assert!(matches!(result, Err(FlowError::InvalidDefinition(_))));
134 }
135
136 #[tokio::test]
137 async fn sub_flow_no_store_returns_internal_error() {
138 let ctx = ExecContext {
139 data: json!({ "name": "any" }),
140 flow_store: None,
141 ..Default::default()
142 };
143 let result = SubFlowNode.execute(ctx).await;
144 assert!(matches!(result, Err(FlowError::Internal(_))));
145 }
146
147 #[tokio::test]
148 async fn sub_flow_unknown_name_returns_flow_not_found() {
149 let store = Arc::new(MemoryFlowStore::new());
150 let ctx = ExecContext {
151 data: json!({ "name": "nonexistent" }),
152 flow_store: Some(store),
153 ..Default::default()
154 };
155 let result = SubFlowNode.execute(ctx).await;
156 assert!(matches!(result, Err(FlowError::FlowNotFound(_))));
157 }
158
159 #[tokio::test]
160 async fn sub_flow_inherits_and_overrides_variables() {
161 use crate::node::Node as _;
162 use crate::registry::NodeRegistry;
163
164 let store = Arc::new(MemoryFlowStore::new());
166 let def = json!({
167 "nodes": [{
168 "id": "read",
169 "type": "code",
170 "data": {
171 "language": "rhai",
172 "code": "let v = variables[\"x\"]; #{result: v}"
173 }
174 }],
175 "edges": []
176 });
177 store.save("var-flow", &def).await.unwrap();
178
179 let mut parent_vars = HashMap::new();
180 parent_vars.insert("x".to_string(), json!(10));
181
182 let ctx = ExecContext {
183 data: json!({
184 "name": "var-flow",
185 "variables": { "x": 42 } }),
187 variables: parent_vars,
188 flow_store: Some(store),
189 registry: Arc::new(NodeRegistry::with_defaults()),
190 ..Default::default()
191 };
192
193 let output = SubFlowNode.execute(ctx).await.unwrap();
194 assert_eq!(output["read"]["result"], json!(42));
195 }
196}