Skip to main content

a3s_flow/nodes/
subflow.rs

1//! `"sub-flow"` node — execute a named flow as an inline step.
2//!
3//! Loads the named flow definition from the engine's [`FlowStore`], parses it,
4//! and runs it synchronously as part of the parent flow's wave. The sub-flow
5//! inherits the parent's node registry and variables; the `data["variables"]`
6//! map (if present) extends or overrides them.
7//!
8//! The node output is a JSON object whose keys are the sub-flow's node IDs and
9//! whose values are those nodes' outputs — identical in shape to
10//! [`FlowResult::outputs`](crate::result::FlowResult::outputs).
11//!
12//! # Flow definition example
13//!
14//! ```json
15//! {
16//!   "id": "call-summarizer",
17//!   "type": "sub-flow",
18//!   "data": {
19//!     "name": "summarizer-flow",
20//!     "variables": { "max_tokens": 256 }
21//!   }
22//! }
23//! ```
24//!
25//! # Errors
26//!
27//! - [`FlowError::Internal`] — no [`FlowStore`] is configured on the engine.
28//! - [`FlowError::FlowNotFound`] — no flow with `name` exists in the store.
29//! - Any error propagated from the sub-flow's own execution.
30
31use std::collections::HashMap;
32use std::sync::Arc;
33
34use async_trait::async_trait;
35use serde_json::Value;
36
37use crate::error::{FlowError, Result};
38use crate::graph::DagGraph;
39use crate::node::{ExecContext, Node};
40use crate::runner::FlowRunner;
41
42pub struct SubFlowNode;
43
44#[async_trait]
45impl Node for SubFlowNode {
46    fn node_type(&self) -> &str {
47        "sub-flow"
48    }
49
50    async fn execute(&self, ctx: ExecContext) -> Result<Value> {
51        let name = ctx
52            .data
53            .get("name")
54            .and_then(|v| v.as_str())
55            .ok_or_else(|| {
56                FlowError::InvalidDefinition(
57                    "sub-flow node requires a \"name\" string in data".into(),
58                )
59            })?;
60
61        let store = ctx.flow_store.as_ref().ok_or_else(|| {
62            FlowError::Internal(
63                "sub-flow node requires a FlowStore; configure one via \
64                 FlowEngine::with_flow_store"
65                    .into(),
66            )
67        })?;
68
69        let definition = store
70            .load(name)
71            .await?
72            .ok_or_else(|| FlowError::FlowNotFound(name.to_string()))?;
73
74        let dag = DagGraph::from_json(&definition)?;
75
76        // Inherit parent variables; let data["variables"] extend/override them.
77        let mut variables: HashMap<String, Value> = ctx.variables.clone();
78        if let Some(overrides) = ctx.data.get("variables").and_then(|v| v.as_object()) {
79            for (k, v) in overrides {
80                variables.insert(k.clone(), v.clone());
81            }
82        }
83
84        let mut runner = FlowRunner::with_arc_registry(dag, Arc::clone(&ctx.registry));
85        if let Some(fs) = ctx.flow_store {
86            runner = runner.with_flow_store(fs);
87        }
88
89        let result = runner.run(variables).await?;
90
91        // Return the sub-flow's per-node outputs as a single JSON object.
92        Ok(Value::Object(result.outputs.into_iter().collect()))
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use super::*;
99    use crate::flow_store::{FlowStore, MemoryFlowStore};
100    use serde_json::json;
101    use std::sync::Arc;
102
103    #[tokio::test]
104    async fn sub_flow_runs_named_flow_and_returns_outputs() {
105        let store = Arc::new(MemoryFlowStore::new());
106        let def = json!({
107            "nodes": [{ "id": "x", "type": "noop" }],
108            "edges": []
109        });
110        store.save("inner", &def).await.unwrap();
111
112        let ctx = ExecContext {
113            data: json!({ "name": "inner" }),
114            flow_store: Some(store),
115            ..Default::default()
116        };
117
118        let node = SubFlowNode;
119        let output = node.execute(ctx).await.unwrap();
120        // noop returns its inputs (empty here), so output["x"] == {}
121        assert!(output.get("x").is_some());
122    }
123
124    #[tokio::test]
125    async fn sub_flow_missing_name_returns_error() {
126        let ctx = ExecContext {
127            data: json!({}),
128            ..Default::default()
129        };
130        let result = SubFlowNode.execute(ctx).await;
131        assert!(matches!(result, Err(FlowError::InvalidDefinition(_))));
132    }
133
134    #[tokio::test]
135    async fn sub_flow_no_store_returns_internal_error() {
136        let ctx = ExecContext {
137            data: json!({ "name": "any" }),
138            flow_store: None,
139            ..Default::default()
140        };
141        let result = SubFlowNode.execute(ctx).await;
142        assert!(matches!(result, Err(FlowError::Internal(_))));
143    }
144
145    #[tokio::test]
146    async fn sub_flow_unknown_name_returns_flow_not_found() {
147        let store = Arc::new(MemoryFlowStore::new());
148        let ctx = ExecContext {
149            data: json!({ "name": "nonexistent" }),
150            flow_store: Some(store),
151            ..Default::default()
152        };
153        let result = SubFlowNode.execute(ctx).await;
154        assert!(matches!(result, Err(FlowError::FlowNotFound(_))));
155    }
156
157    #[tokio::test]
158    async fn sub_flow_inherits_and_overrides_variables() {
159        use crate::node::Node as _;
160        use crate::registry::NodeRegistry;
161
162        // Inner flow: a "code" node that reads a variable and returns it.
163        let store = Arc::new(MemoryFlowStore::new());
164        let def = json!({
165            "nodes": [{
166                "id": "read",
167                "type": "code",
168                "data": {
169                    "language": "rhai",
170                    "code": "let v = variables[\"x\"]; #{result: v}"
171                }
172            }],
173            "edges": []
174        });
175        store.save("var-flow", &def).await.unwrap();
176
177        let mut parent_vars = HashMap::new();
178        parent_vars.insert("x".to_string(), json!(10));
179
180        let ctx = ExecContext {
181            data: json!({
182                "name": "var-flow",
183                "variables": { "x": 42 }   // override parent's x=10
184            }),
185            variables: parent_vars,
186            flow_store: Some(store),
187            registry: Arc::new(NodeRegistry::with_defaults()),
188            ..Default::default()
189        };
190
191        let output = SubFlowNode.execute(ctx).await.unwrap();
192        assert_eq!(output["read"]["result"], json!(42));
193    }
194}