1use serde_json::Value;
12use worldinterface_connector::transform::resolve_path;
13use worldinterface_contextstore::ContextStore;
14use worldinterface_core::id::{trigger_input_node_id, FlowRunId, NodeId};
15
16use crate::error::ResolveError;
17
18pub fn resolve_params(
23 params: &Value,
24 flow_run_id: FlowRunId,
25 flow_params: Option<&Value>,
26 store: &dyn ContextStore,
27) -> Result<Value, ResolveError> {
28 match params {
29 Value::String(s) => resolve_string(s, flow_run_id, flow_params, store),
30 Value::Object(map) => {
31 let mut resolved = serde_json::Map::new();
32 for (key, value) in map {
33 resolved
34 .insert(key.clone(), resolve_params(value, flow_run_id, flow_params, store)?);
35 }
36 Ok(Value::Object(resolved))
37 }
38 Value::Array(arr) => {
39 let resolved: Result<Vec<Value>, ResolveError> =
40 arr.iter().map(|v| resolve_params(v, flow_run_id, flow_params, store)).collect();
41 Ok(Value::Array(resolved?))
42 }
43 other => Ok(other.clone()),
45 }
46}
47
48fn resolve_string(
50 s: &str,
51 flow_run_id: FlowRunId,
52 flow_params: Option<&Value>,
53 store: &dyn ContextStore,
54) -> Result<Value, ResolveError> {
55 if !s.contains("{{") {
57 return Ok(Value::String(s.to_string()));
58 }
59
60 let trimmed = s.trim();
62 if trimmed.starts_with("{{") && trimmed.ends_with("}}") {
63 let inner = &trimmed[2..trimmed.len() - 2];
65 if !inner.contains("{{") && !inner.contains("}}") {
66 return resolve_reference(inner.trim(), flow_run_id, flow_params, store);
67 }
68 }
69
70 let mut result = String::new();
72 let mut remaining = s;
73
74 while let Some(start) = remaining.find("{{") {
75 result.push_str(&remaining[..start]);
76 let after_open = &remaining[start + 2..];
77 if let Some(end) = after_open.find("}}") {
78 let ref_str = after_open[..end].trim();
79 let value = resolve_reference(ref_str, flow_run_id, flow_params, store)?;
80 result.push_str(&value_to_string(&value));
81 remaining = &after_open[end + 2..];
82 } else {
83 result.push_str("{{");
85 remaining = after_open;
86 }
87 }
88 result.push_str(remaining);
89
90 Ok(Value::String(result))
91}
92
93fn resolve_reference(
95 reference: &str,
96 flow_run_id: FlowRunId,
97 flow_params: Option<&Value>,
98 store: &dyn ContextStore,
99) -> Result<Value, ResolveError> {
100 if let Some(rest) = reference.strip_prefix("nodes.") {
101 resolve_node_reference(rest, flow_run_id, store)
102 } else if let Some(rest) = reference.strip_prefix("params.") {
103 resolve_param_reference(rest, flow_params)
104 } else if let Some(rest) = reference.strip_prefix("trigger.") {
105 resolve_trigger_reference(rest, flow_run_id, store)
106 } else {
107 Err(ResolveError::FlowParamNotFound { path: reference.to_string() })
109 }
110}
111
112fn resolve_node_reference(
114 rest: &str,
115 flow_run_id: FlowRunId,
116 store: &dyn ContextStore,
117) -> Result<Value, ResolveError> {
118 if rest.len() < 36 {
120 return Err(ResolveError::FlowParamNotFound { path: format!("nodes.{rest}") });
121 }
122
123 let node_id_str = &rest[..36];
124 let node_id: NodeId = node_id_str
125 .parse::<uuid::Uuid>()
126 .map(NodeId::from)
127 .map_err(|_| ResolveError::FlowParamNotFound { path: format!("nodes.{rest}") })?;
128
129 let output =
130 store.get(flow_run_id, node_id)?.ok_or(ResolveError::NodeOutputNotFound { node_id })?;
131
132 let after_uuid = &rest[36..];
134 if after_uuid.is_empty() {
135 return Ok(output);
136 }
137
138 let path = after_uuid.strip_prefix('.').unwrap_or(after_uuid);
140
141 let path = path
143 .strip_prefix("output.")
144 .or_else(|| if path == "output" { Some("") } else { Some(path) })
145 .unwrap_or(path);
146
147 if path.is_empty() {
148 return Ok(output);
149 }
150
151 resolve_path(&output, path)
152 .cloned()
153 .ok_or_else(|| ResolveError::PathNotFound { node_id, path: path.to_string() })
154}
155
156fn resolve_param_reference(path: &str, flow_params: Option<&Value>) -> Result<Value, ResolveError> {
158 let params =
159 flow_params.ok_or_else(|| ResolveError::FlowParamNotFound { path: path.to_string() })?;
160
161 resolve_path(params, path)
162 .cloned()
163 .ok_or_else(|| ResolveError::FlowParamNotFound { path: path.to_string() })
164}
165
166fn resolve_trigger_reference(
168 path: &str,
169 flow_run_id: FlowRunId,
170 store: &dyn ContextStore,
171) -> Result<Value, ResolveError> {
172 let trigger_node_id = trigger_input_node_id();
173 let trigger_data =
174 store.get(flow_run_id, trigger_node_id)?.ok_or(ResolveError::TriggerInputNotFound)?;
175
176 if path.is_empty() {
177 return Ok(trigger_data);
178 }
179
180 resolve_path(&trigger_data, path)
181 .cloned()
182 .ok_or_else(|| ResolveError::FlowParamNotFound { path: format!("trigger.{}", path) })
183}
184
185fn value_to_string(value: &Value) -> String {
187 match value {
188 Value::String(s) => s.clone(),
189 Value::Null => "null".to_string(),
190 Value::Bool(b) => b.to_string(),
191 Value::Number(n) => n.to_string(),
192 other => other.to_string(),
194 }
195}
196
197#[cfg(test)]
198mod tests {
199 use serde_json::json;
200 use worldinterface_contextstore::SqliteContextStore;
201
202 use super::*;
203
204 fn make_store() -> SqliteContextStore {
205 SqliteContextStore::in_memory().unwrap()
206 }
207
208 #[test]
211 fn resolve_no_templates() {
212 let store = make_store();
213 let input = json!({"key": "plain value", "num": 42});
214 let result = resolve_params(&input, FlowRunId::new(), None, &store).unwrap();
215 assert_eq!(result, input);
216 }
217
218 #[test]
219 fn resolve_simple_node_ref() {
220 let store = make_store();
221 let fr = FlowRunId::new();
222 let node_id = NodeId::new();
223 store.put(fr, node_id, &json!({"field": "hello"})).unwrap();
224
225 let template = format!("{{{{nodes.{}.field}}}}", node_id.as_ref());
226 let input = Value::String(template);
227 let result = resolve_params(&input, fr, None, &store).unwrap();
228 assert_eq!(result, json!("hello"));
229 }
230
231 #[test]
232 fn resolve_flow_param() {
233 let store = make_store();
234 let params = json!({"key": "value"});
235 let input = json!("{{params.key}}");
236 let result = resolve_params(&input, FlowRunId::new(), Some(¶ms), &store).unwrap();
237 assert_eq!(result, json!("value"));
238 }
239
240 #[test]
241 fn resolve_nested_path() {
242 let store = make_store();
243 let fr = FlowRunId::new();
244 let node_id = NodeId::new();
245 store.put(fr, node_id, &json!({"a": {"b": {"c": 42}}})).unwrap();
246
247 let template = format!("{{{{nodes.{}.a.b.c}}}}", node_id.as_ref());
248 let input = Value::String(template);
249 let result = resolve_params(&input, fr, None, &store).unwrap();
250 assert_eq!(result, json!(42));
251 }
252
253 #[test]
254 fn resolve_full_replacement_preserves_type() {
255 let store = make_store();
256 let fr = FlowRunId::new();
257 let node_id = NodeId::new();
258 let obj = json!({"nested": [1, 2, 3], "flag": true});
259 store.put(fr, node_id, &obj).unwrap();
260
261 let template = format!("{{{{nodes.{}}}}}", node_id.as_ref());
262 let input = Value::String(template);
263 let result = resolve_params(&input, fr, None, &store).unwrap();
264 assert_eq!(result, obj);
265 }
266
267 #[test]
268 fn resolve_string_interpolation() {
269 let store = make_store();
270 let params = json!({"id": "abc123"});
271 let input = json!("prefix-{{params.id}}-suffix");
272 let result = resolve_params(&input, FlowRunId::new(), Some(¶ms), &store).unwrap();
273 assert_eq!(result, json!("prefix-abc123-suffix"));
274 }
275
276 #[test]
277 fn resolve_nested_object() {
278 let store = make_store();
279 let params = json!({"host": "example.com", "port": 8080});
280 let input = json!({
281 "url": "https://{{params.host}}:{{params.port}}/api",
282 "nested": {
283 "value": "{{params.host}}"
284 }
285 });
286 let result = resolve_params(&input, FlowRunId::new(), Some(¶ms), &store).unwrap();
287 assert_eq!(
288 result,
289 json!({
290 "url": "https://example.com:8080/api",
291 "nested": {
292 "value": "example.com"
293 }
294 })
295 );
296 }
297
298 #[test]
299 fn resolve_missing_node_output() {
300 let store = make_store();
301 let fr = FlowRunId::new();
302 let node_id = NodeId::new();
303 let template = format!("{{{{nodes.{}}}}}", node_id.as_ref());
304 let input = Value::String(template);
305 let result = resolve_params(&input, fr, None, &store);
306 assert!(matches!(result, Err(ResolveError::NodeOutputNotFound { .. })));
307 }
308
309 #[test]
310 fn resolve_missing_path_in_output() {
311 let store = make_store();
312 let fr = FlowRunId::new();
313 let node_id = NodeId::new();
314 store.put(fr, node_id, &json!({"a": 1})).unwrap();
315
316 let template = format!("{{{{nodes.{}.nonexistent}}}}", node_id.as_ref());
317 let input = Value::String(template);
318 let result = resolve_params(&input, fr, None, &store);
319 assert!(matches!(result, Err(ResolveError::PathNotFound { .. })));
320 }
321
322 #[test]
323 fn resolve_missing_flow_param() {
324 let store = make_store();
325 let params = json!({"a": 1});
326 let input = json!("{{params.nonexistent}}");
327 let result = resolve_params(&input, FlowRunId::new(), Some(¶ms), &store);
328 assert!(matches!(result, Err(ResolveError::FlowParamNotFound { .. })));
329 }
330
331 #[test]
332 fn resolve_no_flow_params_provided() {
333 let store = make_store();
334 let input = json!("{{params.key}}");
335 let result = resolve_params(&input, FlowRunId::new(), None, &store);
336 assert!(matches!(result, Err(ResolveError::FlowParamNotFound { .. })));
337 }
338
339 #[test]
340 fn resolve_with_output_prefix() {
341 let store = make_store();
342 let fr = FlowRunId::new();
343 let node_id = NodeId::new();
344 store.put(fr, node_id, &json!({"field": "value"})).unwrap();
345
346 let template = format!("{{{{nodes.{}.output.field}}}}", node_id.as_ref());
347 let input = Value::String(template);
348 let result = resolve_params(&input, fr, None, &store).unwrap();
349 assert_eq!(result, json!("value"));
350 }
351
352 #[test]
355 fn resolve_trigger_body_field() {
356 let store = make_store();
357 let fr = FlowRunId::new();
358 let trigger_data = json!({
359 "body": {"event": "push"},
360 "headers": {"content-type": "application/json"},
361 "method": "POST",
362 "path": "github/push",
363 "received_at": 1741200000
364 });
365 store.put(fr, trigger_input_node_id(), &trigger_data).unwrap();
366
367 let input = json!("{{trigger.body.event}}");
368 let result = resolve_params(&input, fr, None, &store).unwrap();
369 assert_eq!(result, json!("push"));
370 }
371
372 #[test]
373 fn resolve_trigger_headers() {
374 let store = make_store();
375 let fr = FlowRunId::new();
376 let trigger_data = json!({
377 "body": {},
378 "headers": {"content-type": "application/json"},
379 "method": "POST",
380 "path": "test",
381 "received_at": 0
382 });
383 store.put(fr, trigger_input_node_id(), &trigger_data).unwrap();
384
385 let input = json!("{{trigger.headers.content-type}}");
386 let result = resolve_params(&input, fr, None, &store).unwrap();
387 assert_eq!(result, json!("application/json"));
388 }
389
390 #[test]
391 fn resolve_trigger_full_body() {
392 let store = make_store();
393 let fr = FlowRunId::new();
394 let body = json!({"message": "hello", "count": 5});
395 let trigger_data = json!({
396 "body": body,
397 "headers": {},
398 "method": "POST",
399 "path": "test",
400 "received_at": 0
401 });
402 store.put(fr, trigger_input_node_id(), &trigger_data).unwrap();
403
404 let input = json!("{{trigger.body}}");
405 let result = resolve_params(&input, fr, None, &store).unwrap();
406 assert_eq!(result, json!({"message": "hello", "count": 5}));
407 }
408
409 #[test]
410 fn resolve_trigger_missing_returns_error() {
411 let store = make_store();
412 let fr = FlowRunId::new();
413 let input = json!("{{trigger.body}}");
415 let result = resolve_params(&input, fr, None, &store);
416 assert!(matches!(result, Err(ResolveError::TriggerInputNotFound)));
417 }
418
419 #[test]
420 fn resolve_trigger_nested_path() {
421 let store = make_store();
422 let fr = FlowRunId::new();
423 let trigger_data = json!({
424 "body": {"payload": {"action": "opened"}},
425 "headers": {},
426 "method": "POST",
427 "path": "test",
428 "received_at": 0
429 });
430 store.put(fr, trigger_input_node_id(), &trigger_data).unwrap();
431
432 let input = json!("{{trigger.body.payload.action}}");
433 let result = resolve_params(&input, fr, None, &store).unwrap();
434 assert_eq!(result, json!("opened"));
435 }
436
437 #[test]
438 fn resolve_array_with_templates() {
439 let store = make_store();
440 let params = json!({"a": "x", "b": "y"});
441 let input = json!(["{{params.a}}", "{{params.b}}", "literal"]);
442 let result = resolve_params(&input, FlowRunId::new(), Some(¶ms), &store).unwrap();
443 assert_eq!(result, json!(["x", "y", "literal"]));
444 }
445}