Skip to main content

a3s_flow/nodes/
end.rs

1//! `"end"` node — Dify-compatible flow output collector.
2//!
3//! Gathers values from upstream node outputs and returns them as a named
4//! output map. Place an `"end"` node at the sink of your DAG; its output
5//! in `FlowResult::outputs["end"]` represents the flow's final answer.
6//!
7//! Values are located using JSON pointer strings
8//! (`/upstream_node_id/field/nested`) resolved against the set of upstream
9//! node outputs available via `ctx.inputs`.
10//!
11//! # Config schema
12//!
13//! ```json
14//! {
15//!   "outputs": {
16//!     "answer":       "/llm/text",
17//!     "total_tokens": "/llm/usage/total_tokens",
18//!     "raw_result":   "/transform"
19//!   }
20//! }
21//! ```
22//!
23//! | Field | Type | Required | Description |
24//! |-------|------|----------|-------------|
25//! | `outputs` | object | — | Map of output name → JSON pointer path. If absent, `ctx.inputs` is returned as-is. |
26//!
27//! The JSON pointer `/upstream_node/field` is resolved against a virtual
28//! object whose top-level keys are the direct upstream node IDs. Missing
29//! paths resolve to `null`.
30//!
31//! # Output schema
32//!
33//! ```json
34//! { "answer": "...", "total_tokens": 512 }
35//! ```
36//!
37//! # Example
38//!
39//! ```json
40//! {
41//!   "nodes": [
42//!     { "id": "llm",   "type": "noop" },
43//!     { "id": "end",   "type": "end",
44//!       "data": { "outputs": { "reply": "/llm" } } }
45//!   ],
46//!   "edges": [{ "source": "llm", "target": "end" }]
47//! }
48//! ```
49
50use async_trait::async_trait;
51use serde_json::Value;
52
53use crate::error::{FlowError, Result};
54use crate::node::{ExecContext, Node};
55
56pub struct EndNode;
57
58#[async_trait]
59impl Node for EndNode {
60    fn node_type(&self) -> &str {
61        "end"
62    }
63
64    async fn execute(&self, ctx: ExecContext) -> Result<Value> {
65        let Some(outputs_cfg) = ctx.data.get("outputs") else {
66            // No config → return all upstream inputs as-is.
67            return Ok(Value::Object(ctx.inputs.into_iter().collect()));
68        };
69
70        let outputs_map = outputs_cfg.as_object().ok_or_else(|| {
71            FlowError::InvalidDefinition("end node: \"outputs\" must be a JSON object".into())
72        })?;
73
74        // Build a virtual object from ctx.inputs so JSON pointer resolution works.
75        let source: Value = Value::Object(ctx.inputs.into_iter().collect());
76
77        let mut result = serde_json::Map::new();
78        for (name, ptr_val) in outputs_map {
79            let ptr = ptr_val.as_str().ok_or_else(|| {
80                FlowError::InvalidDefinition(format!(
81                    "end node: output '{name}' path must be a string"
82                ))
83            })?;
84            let value = source.pointer(ptr).cloned().unwrap_or(Value::Null);
85            result.insert(name.clone(), value);
86        }
87
88        Ok(Value::Object(result))
89    }
90}
91
92#[cfg(test)]
93mod tests {
94    use super::*;
95    use serde_json::json;
96    use std::collections::HashMap;
97
98    fn ctx(inputs: HashMap<String, Value>, data: Value) -> ExecContext {
99        ExecContext {
100            data,
101            inputs,
102            ..Default::default()
103        }
104    }
105
106    #[tokio::test]
107    async fn no_config_returns_all_inputs() {
108        let node = EndNode;
109        let inputs = HashMap::from([("a".into(), json!({ "x": 1 }))]);
110        let out = node.execute(ctx(inputs, json!({}))).await.unwrap();
111        assert_eq!(out["a"]["x"], json!(1));
112    }
113
114    #[tokio::test]
115    async fn pointer_extracts_nested_field() {
116        let node = EndNode;
117        let inputs = HashMap::from([(
118            "llm".into(),
119            json!({ "text": "hello", "usage": { "tokens": 42 } }),
120        )]);
121        let out = node
122            .execute(ctx(
123                inputs,
124                json!({ "outputs": { "reply": "/llm/text", "tokens": "/llm/usage/tokens" } }),
125            ))
126            .await
127            .unwrap();
128        assert_eq!(out["reply"], json!("hello"));
129        assert_eq!(out["tokens"], json!(42));
130    }
131
132    #[tokio::test]
133    async fn missing_path_resolves_to_null() {
134        let node = EndNode;
135        let inputs = HashMap::from([("a".into(), json!({}))]);
136        let out = node
137            .execute(ctx(inputs, json!({ "outputs": { "x": "/a/nonexistent" } })))
138            .await
139            .unwrap();
140        assert!(out["x"].is_null());
141    }
142
143    #[tokio::test]
144    async fn pointer_to_entire_upstream_node() {
145        let node = EndNode;
146        let upstream = json!({ "val": 99 });
147        let inputs = HashMap::from([("step".into(), upstream.clone())]);
148        let out = node
149            .execute(ctx(inputs, json!({ "outputs": { "result": "/step" } })))
150            .await
151            .unwrap();
152        assert_eq!(out["result"], upstream);
153    }
154
155    #[tokio::test]
156    async fn invalid_outputs_config_returns_error() {
157        let node = EndNode;
158        let result = node
159            .execute(ctx(HashMap::new(), json!({ "outputs": "not_an_object" })))
160            .await;
161        assert!(matches!(result, Err(FlowError::InvalidDefinition(_))));
162    }
163
164    #[tokio::test]
165    async fn non_string_path_returns_error() {
166        let node = EndNode;
167        let result = node
168            .execute(ctx(HashMap::new(), json!({ "outputs": { "x": 42 } })))
169            .await;
170        assert!(matches!(result, Err(FlowError::InvalidDefinition(_))));
171    }
172}