Skip to main content

a3s_flow/nodes/
start.rs

1//! `"start"` node — Dify-compatible flow entry point with input declaration.
2//!
3//! Declares the flow's expected input variables, applies defaults for any that
4//! are absent, and emits the resolved set as the node's output. Place a single
5//! `"start"` node at the root of your DAG; downstream nodes access inputs via
6//! `ctx.inputs["start"]["variable_name"]`.
7//!
8//! # Config schema
9//!
10//! ```json
11//! {
12//!   "inputs": [
13//!     { "name": "query",      "type": "string"  },
14//!     { "name": "max_tokens", "type": "number",  "default": 256 },
15//!     { "name": "verbose",    "type": "bool",    "default": false }
16//!   ]
17//! }
18//! ```
19//!
20//! | Field | Type | Required | Description |
21//! |-------|------|----------|-------------|
22//! | `inputs` | array | — | Input variable declarations (empty = pass all variables through) |
23//! | `inputs[].name` | string | ✅ | Variable name |
24//! | `inputs[].type` | `"string"` \| `"number"` \| `"bool"` \| `"object"` \| `"array"` | — | Expected type (used for validation) |
25//! | `inputs[].default` | any | — | Value used when the variable is absent from the flow's `variables` map |
26//!
27//! # Output schema
28//!
29//! The output is a JSON object containing the resolved value for each declared
30//! input. If no `inputs` array is declared, the output is the entire
31//! `variables` map passed to the engine.
32//!
33//! # Errors
34//!
35//! - [`FlowError::InvalidDefinition`] — `inputs` is present but not a valid array.
36//! - [`FlowError::Internal`] — a required input (no default) is missing from the
37//!   flow's `variables` map.
38
39use async_trait::async_trait;
40use serde::Deserialize;
41use serde_json::Value;
42
43use crate::error::{FlowError, Result};
44use crate::node::{ExecContext, Node};
45
46#[derive(Debug, Deserialize)]
47struct InputDecl {
48    name: String,
49    #[serde(rename = "type", default)]
50    type_hint: String,
51    default: Option<Value>,
52}
53
54pub struct StartNode;
55
56#[async_trait]
57impl Node for StartNode {
58    fn node_type(&self) -> &str {
59        "start"
60    }
61
62    async fn execute(&self, ctx: ExecContext) -> Result<Value> {
63        let decls_val = ctx.data.get("inputs");
64
65        // No declaration → pass all variables through.
66        let Some(decls_val) = decls_val else {
67            return Ok(Value::Object(ctx.variables.into_iter().collect()));
68        };
69
70        let decls: Vec<InputDecl> = serde_json::from_value(decls_val.clone()).map_err(|e| {
71            FlowError::InvalidDefinition(format!("start node: invalid inputs declaration: {e}"))
72        })?;
73
74        if decls.is_empty() {
75            return Ok(Value::Object(ctx.variables.into_iter().collect()));
76        }
77
78        let mut output = serde_json::Map::new();
79        for decl in &decls {
80            let val = match ctx.variables.get(&decl.name).cloned() {
81                Some(v) => {
82                    validate_type(&decl.name, &v, &decl.type_hint)?;
83                    v
84                }
85                None => decl.default.clone().ok_or_else(|| {
86                    FlowError::Internal(format!(
87                        "start node: required input '{}' not provided",
88                        decl.name
89                    ))
90                })?,
91            };
92            output.insert(decl.name.clone(), val);
93        }
94
95        Ok(Value::Object(output))
96    }
97}
98
99/// Light type validation — checks the JSON value kind matches the declared type.
100///
101/// Silently passes if `type_hint` is empty or unrecognised.
102fn validate_type(name: &str, value: &Value, type_hint: &str) -> Result<()> {
103    let ok = match type_hint {
104        "string" => value.is_string(),
105        "number" => value.is_number(),
106        "bool" => value.is_boolean(),
107        "object" => value.is_object(),
108        "array" => value.is_array(),
109        _ => true, // unknown or empty hint — no validation
110    };
111
112    if !ok {
113        return Err(FlowError::InvalidDefinition(format!(
114            "start node: input '{name}' expected type '{type_hint}', got {}",
115            value_kind(value)
116        )));
117    }
118    Ok(())
119}
120
121fn value_kind(v: &Value) -> &'static str {
122    match v {
123        Value::Null => "null",
124        Value::Bool(_) => "bool",
125        Value::Number(_) => "number",
126        Value::String(_) => "string",
127        Value::Array(_) => "array",
128        Value::Object(_) => "object",
129    }
130}
131
132#[cfg(test)]
133mod tests {
134    use super::*;
135    use serde_json::json;
136    use std::collections::HashMap;
137
138    fn ctx(variables: HashMap<String, Value>, data: Value) -> ExecContext {
139        ExecContext {
140            data,
141            variables,
142            ..Default::default()
143        }
144    }
145
146    #[tokio::test]
147    async fn no_declaration_returns_all_variables() {
148        let node = StartNode;
149        let vars = HashMap::from([("x".into(), json!(1)), ("y".into(), json!("hello"))]);
150        let out = node.execute(ctx(vars, json!({}))).await.unwrap();
151        assert_eq!(out["x"], json!(1));
152        assert_eq!(out["y"], json!("hello"));
153    }
154
155    #[tokio::test]
156    async fn declared_inputs_resolved_from_variables() {
157        let node = StartNode;
158        let vars = HashMap::from([("q".into(), json!("hello"))]);
159        let out = node
160            .execute(ctx(
161                vars,
162                json!({ "inputs": [{ "name": "q", "type": "string" }] }),
163            ))
164            .await
165            .unwrap();
166        assert_eq!(out["q"], json!("hello"));
167    }
168
169    #[tokio::test]
170    async fn default_applied_when_variable_absent() {
171        let node = StartNode;
172        let out = node
173            .execute(ctx(
174                HashMap::new(),
175                json!({
176                    "inputs": [
177                        { "name": "n", "type": "number", "default": 42 }
178                    ]
179                }),
180            ))
181            .await
182            .unwrap();
183        assert_eq!(out["n"], json!(42));
184    }
185
186    #[tokio::test]
187    async fn missing_required_input_returns_error() {
188        let node = StartNode;
189        let result = node
190            .execute(ctx(
191                HashMap::new(),
192                json!({ "inputs": [{ "name": "required_var", "type": "string" }] }),
193            ))
194            .await;
195        assert!(matches!(result, Err(FlowError::Internal(_))));
196    }
197
198    #[tokio::test]
199    async fn type_mismatch_returns_invalid_definition() {
200        let node = StartNode;
201        let vars = HashMap::from([("x".into(), json!(42))]);
202        let result = node
203            .execute(ctx(
204                vars,
205                json!({ "inputs": [{ "name": "x", "type": "string" }] }),
206            ))
207            .await;
208        assert!(matches!(result, Err(FlowError::InvalidDefinition(_))));
209    }
210
211    #[tokio::test]
212    async fn unknown_type_hint_is_accepted() {
213        let node = StartNode;
214        let vars = HashMap::from([("x".into(), json!(true))]);
215        let out = node
216            .execute(ctx(
217                vars,
218                json!({ "inputs": [{ "name": "x", "type": "custom_type" }] }),
219            ))
220            .await
221            .unwrap();
222        assert_eq!(out["x"], json!(true));
223    }
224
225    #[tokio::test]
226    async fn empty_inputs_array_returns_all_variables() {
227        let node = StartNode;
228        let vars = HashMap::from([("a".into(), json!(1))]);
229        let out = node
230            .execute(ctx(vars, json!({ "inputs": [] })))
231            .await
232            .unwrap();
233        assert_eq!(out["a"], json!(1));
234    }
235}