1use async_trait::async_trait;
51use serde_json::Value;
52
53use crate::error::{FlowError, Result};
54use crate::node::{ExecContext, Node};
55
56pub struct EndNode;
57
58#[async_trait]
59impl Node for EndNode {
60 fn node_type(&self) -> &str {
61 "end"
62 }
63
64 async fn execute(&self, ctx: ExecContext) -> Result<Value> {
65 let Some(outputs_cfg) = ctx.data.get("outputs") else {
66 return Ok(Value::Object(ctx.inputs.into_iter().collect()));
68 };
69
70 let outputs_map = outputs_cfg.as_object().ok_or_else(|| {
71 FlowError::InvalidDefinition("end node: \"outputs\" must be a JSON object".into())
72 })?;
73
74 let source: Value = Value::Object(ctx.inputs.into_iter().collect());
76
77 let mut result = serde_json::Map::new();
78 for (name, ptr_val) in outputs_map {
79 let ptr = ptr_val.as_str().ok_or_else(|| {
80 FlowError::InvalidDefinition(format!(
81 "end node: output '{name}' path must be a string"
82 ))
83 })?;
84 let value = source.pointer(ptr).cloned().unwrap_or(Value::Null);
85 result.insert(name.clone(), value);
86 }
87
88 Ok(Value::Object(result))
89 }
90}
91
92#[cfg(test)]
93mod tests {
94 use super::*;
95 use serde_json::json;
96 use std::collections::HashMap;
97
98 fn ctx(inputs: HashMap<String, Value>, data: Value) -> ExecContext {
99 ExecContext {
100 data,
101 inputs,
102 ..Default::default()
103 }
104 }
105
106 #[tokio::test]
107 async fn no_config_returns_all_inputs() {
108 let node = EndNode;
109 let inputs = HashMap::from([("a".into(), json!({ "x": 1 }))]);
110 let out = node.execute(ctx(inputs, json!({}))).await.unwrap();
111 assert_eq!(out["a"]["x"], json!(1));
112 }
113
114 #[tokio::test]
115 async fn pointer_extracts_nested_field() {
116 let node = EndNode;
117 let inputs = HashMap::from([(
118 "llm".into(),
119 json!({ "text": "hello", "usage": { "tokens": 42 } }),
120 )]);
121 let out = node
122 .execute(ctx(
123 inputs,
124 json!({ "outputs": { "reply": "/llm/text", "tokens": "/llm/usage/tokens" } }),
125 ))
126 .await
127 .unwrap();
128 assert_eq!(out["reply"], json!("hello"));
129 assert_eq!(out["tokens"], json!(42));
130 }
131
132 #[tokio::test]
133 async fn missing_path_resolves_to_null() {
134 let node = EndNode;
135 let inputs = HashMap::from([("a".into(), json!({}))]);
136 let out = node
137 .execute(ctx(inputs, json!({ "outputs": { "x": "/a/nonexistent" } })))
138 .await
139 .unwrap();
140 assert!(out["x"].is_null());
141 }
142
143 #[tokio::test]
144 async fn pointer_to_entire_upstream_node() {
145 let node = EndNode;
146 let upstream = json!({ "val": 99 });
147 let inputs = HashMap::from([("step".into(), upstream.clone())]);
148 let out = node
149 .execute(ctx(inputs, json!({ "outputs": { "result": "/step" } })))
150 .await
151 .unwrap();
152 assert_eq!(out["result"], upstream);
153 }
154
155 #[tokio::test]
156 async fn invalid_outputs_config_returns_error() {
157 let node = EndNode;
158 let result = node
159 .execute(ctx(HashMap::new(), json!({ "outputs": "not_an_object" })))
160 .await;
161 assert!(matches!(result, Err(FlowError::InvalidDefinition(_))));
162 }
163
164 #[tokio::test]
165 async fn non_string_path_returns_error() {
166 let node = EndNode;
167 let result = node
168 .execute(ctx(HashMap::new(), json!({ "outputs": { "x": 42 } })))
169 .await;
170 assert!(matches!(result, Err(FlowError::InvalidDefinition(_))));
171 }
172}