a3s_flow/nodes/
subflow.rs1use std::collections::HashMap;
32use std::sync::Arc;
33
34use async_trait::async_trait;
35use serde_json::Value;
36
37use crate::error::{FlowError, Result};
38use crate::graph::DagGraph;
39use crate::node::{ExecContext, Node};
40use crate::runner::FlowRunner;
41
42pub struct SubFlowNode;
43
44#[async_trait]
45impl Node for SubFlowNode {
46 fn node_type(&self) -> &str {
47 "sub-flow"
48 }
49
50 async fn execute(&self, ctx: ExecContext) -> Result<Value> {
51 let name = ctx
52 .data
53 .get("name")
54 .and_then(|v| v.as_str())
55 .ok_or_else(|| {
56 FlowError::InvalidDefinition(
57 "sub-flow node requires a \"name\" string in data".into(),
58 )
59 })?;
60
61 let store = ctx.flow_store.as_ref().ok_or_else(|| {
62 FlowError::Internal(
63 "sub-flow node requires a FlowStore; configure one via \
64 FlowEngine::with_flow_store"
65 .into(),
66 )
67 })?;
68
69 let definition = store
70 .load(name)
71 .await?
72 .ok_or_else(|| FlowError::FlowNotFound(name.to_string()))?;
73
74 let dag = DagGraph::from_json(&definition)?;
75
76 let mut variables: HashMap<String, Value> = ctx.variables.clone();
78 if let Some(overrides) = ctx.data.get("variables").and_then(|v| v.as_object()) {
79 for (k, v) in overrides {
80 variables.insert(k.clone(), v.clone());
81 }
82 }
83
84 let mut runner = FlowRunner::with_arc_registry(dag, Arc::clone(&ctx.registry));
85 if let Some(fs) = ctx.flow_store {
86 runner = runner.with_flow_store(fs);
87 }
88
89 let result = runner.run(variables).await?;
90
91 Ok(Value::Object(result.outputs.into_iter().collect()))
93 }
94}
95
96#[cfg(test)]
97mod tests {
98 use super::*;
99 use crate::flow_store::{FlowStore, MemoryFlowStore};
100 use serde_json::json;
101 use std::sync::Arc;
102
103 #[tokio::test]
104 async fn sub_flow_runs_named_flow_and_returns_outputs() {
105 let store = Arc::new(MemoryFlowStore::new());
106 let def = json!({
107 "nodes": [{ "id": "x", "type": "noop" }],
108 "edges": []
109 });
110 store.save("inner", &def).await.unwrap();
111
112 let ctx = ExecContext {
113 data: json!({ "name": "inner" }),
114 flow_store: Some(store),
115 ..Default::default()
116 };
117
118 let node = SubFlowNode;
119 let output = node.execute(ctx).await.unwrap();
120 assert!(output.get("x").is_some());
122 }
123
124 #[tokio::test]
125 async fn sub_flow_missing_name_returns_error() {
126 let ctx = ExecContext {
127 data: json!({}),
128 ..Default::default()
129 };
130 let result = SubFlowNode.execute(ctx).await;
131 assert!(matches!(result, Err(FlowError::InvalidDefinition(_))));
132 }
133
134 #[tokio::test]
135 async fn sub_flow_no_store_returns_internal_error() {
136 let ctx = ExecContext {
137 data: json!({ "name": "any" }),
138 flow_store: None,
139 ..Default::default()
140 };
141 let result = SubFlowNode.execute(ctx).await;
142 assert!(matches!(result, Err(FlowError::Internal(_))));
143 }
144
145 #[tokio::test]
146 async fn sub_flow_unknown_name_returns_flow_not_found() {
147 let store = Arc::new(MemoryFlowStore::new());
148 let ctx = ExecContext {
149 data: json!({ "name": "nonexistent" }),
150 flow_store: Some(store),
151 ..Default::default()
152 };
153 let result = SubFlowNode.execute(ctx).await;
154 assert!(matches!(result, Err(FlowError::FlowNotFound(_))));
155 }
156
157 #[tokio::test]
158 async fn sub_flow_inherits_and_overrides_variables() {
159 use crate::node::Node as _;
160 use crate::registry::NodeRegistry;
161
162 let store = Arc::new(MemoryFlowStore::new());
164 let def = json!({
165 "nodes": [{
166 "id": "read",
167 "type": "code",
168 "data": {
169 "language": "rhai",
170 "code": "let v = variables[\"x\"]; #{result: v}"
171 }
172 }],
173 "edges": []
174 });
175 store.save("var-flow", &def).await.unwrap();
176
177 let mut parent_vars = HashMap::new();
178 parent_vars.insert("x".to_string(), json!(10));
179
180 let ctx = ExecContext {
181 data: json!({
182 "name": "var-flow",
183 "variables": { "x": 42 } }),
185 variables: parent_vars,
186 flow_store: Some(store),
187 registry: Arc::new(NodeRegistry::with_defaults()),
188 ..Default::default()
189 };
190
191 let output = SubFlowNode.execute(ctx).await.unwrap();
192 assert_eq!(output["read"]["result"], json!(42));
193 }
194}