Skip to main content

a3s_flow/nodes/
subflow.rs

1//! `"sub-flow"` node — execute a named flow as an inline step.
2//!
3//! Loads the named flow definition from the engine's [`FlowStore`], parses it,
4//! and runs it synchronously as part of the parent flow's wave. The sub-flow
5//! inherits the parent's node registry and variables; the `data["variables"]`
6//! map (if present) extends or overrides them.
7//!
8//! The node output is a JSON object whose keys are the sub-flow's node IDs and
9//! whose values are those nodes' outputs — identical in shape to
10//! [`FlowResult::outputs`](crate::result::FlowResult::outputs).
11//!
12//! # Flow definition example
13//!
14//! ```json
15//! {
16//!   "id": "call-summarizer",
17//!   "type": "sub-flow",
18//!   "data": {
19//!     "name": "summarizer-flow",
20//!     "variables": { "max_tokens": 256 }
21//!   }
22//! }
23//! ```
24//!
25//! # Errors
26//!
27//! - [`FlowError::Internal`] — no [`FlowStore`] is configured on the engine.
28//! - [`FlowError::FlowNotFound`] — no flow with `name` exists in the store.
29//! - Any error propagated from the sub-flow's own execution.
30
31use std::collections::HashMap;
32use std::sync::Arc;
33
34use async_trait::async_trait;
35use serde_json::Value;
36
37use crate::error::{FlowError, Result};
38use crate::graph::DagGraph;
39use crate::node::{ExecContext, Node};
40use crate::runner::FlowRunner;
41
42pub struct SubFlowNode;
43
44#[async_trait]
45impl Node for SubFlowNode {
46    fn node_type(&self) -> &str {
47        "sub-flow"
48    }
49
50    async fn execute(&self, ctx: ExecContext) -> Result<Value> {
51        let name = ctx
52            .data
53            .get("name")
54            .and_then(|v| v.as_str())
55            .ok_or_else(|| {
56                FlowError::InvalidDefinition(
57                    "sub-flow node requires a \"name\" string in data".into(),
58                )
59            })?;
60
61        let store = ctx.flow_store.as_ref().ok_or_else(|| {
62            FlowError::Internal(
63                "sub-flow node requires a FlowStore; configure one via \
64                 FlowEngine::with_flow_store"
65                    .into(),
66            )
67        })?;
68
69        let definition = store
70            .load(name)
71            .await?
72            .ok_or_else(|| FlowError::FlowNotFound(name.to_string()))?;
73
74        let dag = DagGraph::from_json(&definition)?;
75
76        // Inherit parent variables; let data["variables"] extend/override them.
77        let mut variables: HashMap<String, Value> = ctx.variables.clone();
78        if let Some(overrides) = ctx.data.get("variables").and_then(|v| v.as_object()) {
79            for (k, v) in overrides {
80                variables.insert(k.clone(), v.clone());
81            }
82        }
83
84        let mut runner = FlowRunner::with_arc_registry(dag, Arc::clone(&ctx.registry));
85        if let Some(fs) = ctx.flow_store {
86            runner = runner.with_flow_store(fs);
87        }
88
89        let result = runner.run(variables).await?;
90
91        // Return the sub-flow's per-node outputs as a single JSON object.
92        Ok(Value::Object(
93            result.outputs.into_iter().map(|(k, v)| (k, v)).collect(),
94        ))
95    }
96}
97
98#[cfg(test)]
99mod tests {
100    use super::*;
101    use crate::flow_store::{FlowStore, MemoryFlowStore};
102    use serde_json::json;
103    use std::sync::Arc;
104
105    #[tokio::test]
106    async fn sub_flow_runs_named_flow_and_returns_outputs() {
107        let store = Arc::new(MemoryFlowStore::new());
108        let def = json!({
109            "nodes": [{ "id": "x", "type": "noop" }],
110            "edges": []
111        });
112        store.save("inner", &def).await.unwrap();
113
114        let ctx = ExecContext {
115            data: json!({ "name": "inner" }),
116            flow_store: Some(store),
117            ..Default::default()
118        };
119
120        let node = SubFlowNode;
121        let output = node.execute(ctx).await.unwrap();
122        // noop returns its inputs (empty here), so output["x"] == {}
123        assert!(output.get("x").is_some());
124    }
125
126    #[tokio::test]
127    async fn sub_flow_missing_name_returns_error() {
128        let ctx = ExecContext {
129            data: json!({}),
130            ..Default::default()
131        };
132        let result = SubFlowNode.execute(ctx).await;
133        assert!(matches!(result, Err(FlowError::InvalidDefinition(_))));
134    }
135
136    #[tokio::test]
137    async fn sub_flow_no_store_returns_internal_error() {
138        let ctx = ExecContext {
139            data: json!({ "name": "any" }),
140            flow_store: None,
141            ..Default::default()
142        };
143        let result = SubFlowNode.execute(ctx).await;
144        assert!(matches!(result, Err(FlowError::Internal(_))));
145    }
146
147    #[tokio::test]
148    async fn sub_flow_unknown_name_returns_flow_not_found() {
149        let store = Arc::new(MemoryFlowStore::new());
150        let ctx = ExecContext {
151            data: json!({ "name": "nonexistent" }),
152            flow_store: Some(store),
153            ..Default::default()
154        };
155        let result = SubFlowNode.execute(ctx).await;
156        assert!(matches!(result, Err(FlowError::FlowNotFound(_))));
157    }
158
159    #[tokio::test]
160    async fn sub_flow_inherits_and_overrides_variables() {
161        use crate::node::Node as _;
162        use crate::registry::NodeRegistry;
163
164        // Inner flow: a "code" node that reads a variable and returns it.
165        let store = Arc::new(MemoryFlowStore::new());
166        let def = json!({
167            "nodes": [{
168                "id": "read",
169                "type": "code",
170                "data": {
171                    "language": "rhai",
172                    "code": "let v = variables[\"x\"]; #{result: v}"
173                }
174            }],
175            "edges": []
176        });
177        store.save("var-flow", &def).await.unwrap();
178
179        let mut parent_vars = HashMap::new();
180        parent_vars.insert("x".to_string(), json!(10));
181
182        let ctx = ExecContext {
183            data: json!({
184                "name": "var-flow",
185                "variables": { "x": 42 }   // override parent's x=10
186            }),
187            variables: parent_vars,
188            flow_store: Some(store),
189            registry: Arc::new(NodeRegistry::with_defaults()),
190            ..Default::default()
191        };
192
193        let output = SubFlowNode.execute(ctx).await.unwrap();
194        assert_eq!(output["read"]["result"], json!(42));
195    }
196}