1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::sync::Arc;
5use std::time::Duration;
6
7#[cfg(feature = "mcp")]
8use anyhow::anyhow;
9use anyhow::{Context, Result, bail};
10use greentic_flow::ir::{FlowIR, NodeIR};
11#[cfg(feature = "mcp")]
12use greentic_mcp::{ExecConfig, ExecRequest};
13#[cfg(feature = "mcp")]
14use greentic_types::TenantCtx as TypesTenantCtx;
15use handlebars::Handlebars;
16use parking_lot::RwLock;
17use serde::Deserialize;
18use serde_json::{Map as JsonMap, Value, json};
19use tokio::task;
20
21use super::mocks::MockLayer;
22use crate::config::{HostConfig, McpRetryConfig};
23use crate::pack::{FlowDescriptor, PackRuntime};
24#[cfg(feature = "mcp")]
25use crate::telemetry::tenant_context;
26use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
27
28pub struct FlowEngine {
29 pack: Arc<PackRuntime>,
30 flows: Vec<FlowDescriptor>,
31 flow_ir: RwLock<HashMap<String, FlowIR>>,
32 #[cfg(feature = "mcp")]
33 exec_config: ExecConfig,
34 template_engine: Arc<Handlebars<'static>>,
35 default_env: String,
36}
37
38impl FlowEngine {
39 pub async fn new(pack: Arc<PackRuntime>, config: Arc<HostConfig>) -> Result<Self> {
40 #[cfg(not(feature = "mcp"))]
41 let _ = &config;
42 let flows = pack.list_flows().await?;
43 for flow in &flows {
44 tracing::info!(flow_id = %flow.id, flow_type = %flow.flow_type, "registered flow");
45 }
46
47 #[cfg(feature = "mcp")]
48 let exec_config = config
49 .mcp_exec_config()
50 .context("failed to build MCP executor config")?;
51
52 let mut ir_map = HashMap::new();
53 for flow in &flows {
54 let pack_clone = Arc::clone(&pack);
55 let flow_id = flow.id.clone();
56 let task_flow_id = flow_id.clone();
57 match task::spawn_blocking(move || pack_clone.load_flow_ir(&task_flow_id)).await {
58 Ok(Ok(ir)) => {
59 ir_map.insert(flow_id, ir);
60 }
61 Ok(Err(err)) => {
62 tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
63 }
64 Err(err) => {
65 tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
66 }
67 }
68 }
69
70 let mut handlebars = Handlebars::new();
71 handlebars.set_strict_mode(false);
72
73 Ok(Self {
74 pack,
75 flows,
76 flow_ir: RwLock::new(ir_map),
77 #[cfg(feature = "mcp")]
78 exec_config,
79 template_engine: Arc::new(handlebars),
80 default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
81 })
82 }
83
84 async fn get_or_load_flow_ir(&self, flow_id: &str) -> Result<FlowIR> {
85 if let Some(ir) = self.flow_ir.read().get(flow_id).cloned() {
86 return Ok(ir);
87 }
88
89 let pack = Arc::clone(&self.pack);
90 let flow_id_owned = flow_id.to_string();
91 let task_flow_id = flow_id_owned.clone();
92 let ir = task::spawn_blocking(move || pack.load_flow_ir(&task_flow_id))
93 .await
94 .context("failed to join flow metadata task")??;
95 self.flow_ir
96 .write()
97 .insert(flow_id_owned.clone(), ir.clone());
98 Ok(ir)
99 }
100
101 pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<Value> {
102 let span = tracing::info_span!(
103 "flow.execute",
104 tenant = tracing::field::Empty,
105 flow_id = tracing::field::Empty,
106 node_id = tracing::field::Empty,
107 tool = tracing::field::Empty,
108 action = tracing::field::Empty
109 );
110 annotate_span(
111 &span,
112 &FlowSpanAttributes {
113 tenant: ctx.tenant,
114 flow_id: ctx.flow_id,
115 node_id: ctx.node_id,
116 tool: ctx.tool,
117 action: ctx.action,
118 },
119 );
120 set_flow_context(
121 &self.default_env,
122 ctx.tenant,
123 ctx.flow_id,
124 ctx.node_id,
125 ctx.provider_id,
126 ctx.session_id,
127 );
128 let retry_config = ctx.retry_config;
129 let original_input = input;
130 async move {
131 let mut attempt = 0u32;
132 loop {
133 attempt += 1;
134 match self.execute_once(&ctx, original_input.clone()).await {
135 Ok(value) => return Ok(value),
136 Err(err) => {
137 if attempt >= retry_config.max_attempts || !should_retry(&err) {
138 return Err(err);
139 }
140 let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
141 tracing::warn!(
142 tenant = ctx.tenant,
143 flow_id = ctx.flow_id,
144 attempt,
145 max_attempts = retry_config.max_attempts,
146 delay_ms = delay,
147 error = %err,
148 "transient flow execution failure, backing off"
149 );
150 tokio::time::sleep(Duration::from_millis(delay)).await;
151 }
152 }
153 }
154 }
155 .instrument(span)
156 .await
157 }
158
159 async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<Value> {
160 let flow_ir = self.get_or_load_flow_ir(ctx.flow_id).await?;
161
162 let mut state = ExecutionState::new(input);
163 let mut current = flow_ir
164 .start
165 .clone()
166 .or_else(|| flow_ir.nodes.keys().next().cloned())
167 .with_context(|| format!("flow {} has no start node", flow_ir.id))?;
168
169 loop {
170 let node = flow_ir
171 .nodes
172 .get(¤t)
173 .with_context(|| format!("node {current} not found"))?;
174
175 let context_value = state.context();
176 let payload = resolve_template_value(
177 self.template_engine.as_ref(),
178 &node.payload_expr,
179 &context_value,
180 )?;
181 let observed_payload = payload.clone();
182 let node_id = current.clone();
183 let event = NodeEvent {
184 context: ctx,
185 node_id: &node_id,
186 node,
187 payload: &observed_payload,
188 };
189 if let Some(observer) = ctx.observer {
190 observer.on_node_start(&event);
191 }
192 let dispatch = self
193 .dispatch_node(ctx, ¤t, node, &state, payload)
194 .await;
195 let output = match dispatch {
196 Ok(output) => {
197 if let Some(observer) = ctx.observer {
198 observer.on_node_end(&event, &output.payload);
199 }
200 output
201 }
202 Err(err) => {
203 if let Some(observer) = ctx.observer {
204 observer.on_node_error(&event, err.as_ref());
205 }
206 return Err(err);
207 }
208 };
209
210 state.nodes.insert(current.clone(), output.clone());
211
212 let mut next = None;
213 for route in &node.routes {
214 if route.out || matches!(route.to.as_deref(), Some("out")) {
215 return Ok(output.payload);
216 }
217 if let Some(to) = &route.to {
218 next = Some(to.clone());
219 break;
220 }
221 }
222
223 match next {
224 Some(n) => current = n,
225 None => return Ok(output.payload),
226 }
227 }
228 }
229
230 async fn dispatch_node(
231 &self,
232 ctx: &FlowContext<'_>,
233 _node_id: &str,
234 node: &NodeIR,
235 state: &ExecutionState,
236 payload: Value,
237 ) -> Result<NodeOutput> {
238 match node.component.as_str() {
239 "qa.process" => Ok(NodeOutput::new(payload)),
240 "mcp.exec" => self.execute_mcp(ctx, payload).await,
241 "templating.handlebars" => self.execute_template(state, payload),
242 component if component.starts_with("emit") => Ok(NodeOutput::new(payload)),
243 other => bail!("unsupported node component: {other}"),
244 }
245 }
246
247 async fn execute_mcp(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
248 #[cfg(not(feature = "mcp"))]
249 {
250 let _ = (ctx, payload);
251 bail!("crate built without `mcp` feature; mcp.exec nodes are unavailable");
252 }
253
254 #[cfg(feature = "mcp")]
255 {
256 #[derive(Deserialize)]
257 struct McpPayload {
258 component: String,
259 action: String,
260 #[serde(default)]
261 args: Value,
262 }
263
264 let payload: McpPayload =
265 serde_json::from_value(payload).context("invalid payload for mcp.exec node")?;
266
267 if let Some(mocks) = ctx.mocks
268 && let Some(result) = mocks.tool_short_circuit(&payload.component, &payload.action)
269 {
270 let value = result.map_err(|err| anyhow!(err))?;
271 return Ok(NodeOutput::new(value));
272 }
273
274 let request = ExecRequest {
275 component: payload.component,
276 action: payload.action,
277 args: payload.args,
278 tenant: Some(types_tenant_ctx(ctx, &self.default_env)),
279 };
280
281 let exec_config = self.exec_config.clone();
282 let exec_result =
283 task::spawn_blocking(move || greentic_mcp::exec(request, &exec_config))
284 .await
285 .context("failed to join mcp.exec")?;
286 let value = exec_result.map_err(|err| anyhow!(err))?;
287
288 Ok(NodeOutput::new(value))
289 }
290 }
291
292 fn execute_template(&self, state: &ExecutionState, payload: Value) -> Result<NodeOutput> {
293 let payload: TemplatePayload = serde_json::from_value(payload)
294 .context("invalid payload for templating.handlebars node")?;
295
296 let mut context = state.context();
297 if !payload.data.is_null() {
298 let data =
299 resolve_template_value(self.template_engine.as_ref(), &payload.data, &context)?;
300 merge_values(&mut context, data);
301 }
302
303 let rendered = render_template(
304 self.template_engine.as_ref(),
305 &payload.template,
306 &payload.partials,
307 &context,
308 )?;
309
310 Ok(NodeOutput::new(json!({ "text": rendered })))
311 }
312
313 pub fn flows(&self) -> &[FlowDescriptor] {
314 &self.flows
315 }
316
317 pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
318 self.flows
319 .iter()
320 .find(|descriptor| descriptor.flow_type == flow_type)
321 }
322
323 pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
324 self.flows
325 .iter()
326 .find(|descriptor| descriptor.id == flow_id)
327 }
328}
329
330pub trait ExecutionObserver: Send + Sync {
331 fn on_node_start(&self, event: &NodeEvent<'_>);
332 fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
333 fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
334}
335
336pub struct NodeEvent<'a> {
337 pub context: &'a FlowContext<'a>,
338 pub node_id: &'a str,
339 pub node: &'a NodeIR,
340 pub payload: &'a Value,
341}
342
343struct ExecutionState {
344 input: Value,
345 nodes: HashMap<String, NodeOutput>,
346}
347
348impl ExecutionState {
349 fn new(input: Value) -> Self {
350 Self {
351 input,
352 nodes: HashMap::new(),
353 }
354 }
355
356 fn context(&self) -> Value {
357 let mut nodes = JsonMap::new();
358 for (id, output) in &self.nodes {
359 nodes.insert(
360 id.clone(),
361 json!({
362 "ok": output.ok,
363 "payload": output.payload.clone(),
364 "meta": output.meta.clone(),
365 }),
366 );
367 }
368 json!({
369 "input": self.input.clone(),
370 "nodes": nodes,
371 })
372 }
373}
374
375#[derive(Clone)]
376struct NodeOutput {
377 ok: bool,
378 payload: Value,
379 meta: Value,
380}
381
382impl NodeOutput {
383 fn new(payload: Value) -> Self {
384 Self {
385 ok: true,
386 payload,
387 meta: Value::Null,
388 }
389 }
390}
391
392#[derive(Deserialize)]
393struct TemplatePayload {
394 template: String,
395 #[serde(default)]
396 partials: HashMap<String, String>,
397 #[serde(default)]
398 data: Value,
399}
400
401fn resolve_template_value(
402 engine: &Handlebars<'static>,
403 value: &Value,
404 context: &Value,
405) -> Result<Value> {
406 match value {
407 Value::String(s) => {
408 if s.contains("{{") {
409 let rendered = engine
410 .render_template(s, context)
411 .with_context(|| format!("failed to render template: {s}"))?;
412 Ok(Value::String(rendered))
413 } else {
414 Ok(Value::String(s.clone()))
415 }
416 }
417 Value::Array(items) => {
418 let values = items
419 .iter()
420 .map(|v| resolve_template_value(engine, v, context))
421 .collect::<Result<Vec<_>>>()?;
422 Ok(Value::Array(values))
423 }
424 Value::Object(map) => {
425 let mut resolved = JsonMap::new();
426 for (key, v) in map {
427 resolved.insert(key.clone(), resolve_template_value(engine, v, context)?);
428 }
429 Ok(Value::Object(resolved))
430 }
431 other => Ok(other.clone()),
432 }
433}
434
435fn merge_values(target: &mut Value, addition: Value) {
436 match (target, addition) {
437 (Value::Object(target_map), Value::Object(add_map)) => {
438 for (key, value) in add_map {
439 merge_values(target_map.entry(key).or_insert(Value::Null), value);
440 }
441 }
442 (slot, value) => {
443 *slot = value;
444 }
445 }
446}
447
448fn render_template(
449 base: &Handlebars<'static>,
450 template: &str,
451 partials: &HashMap<String, String>,
452 context: &Value,
453) -> Result<String> {
454 let mut engine = base.clone();
455 for (name, body) in partials {
456 engine
457 .register_template_string(name, body)
458 .with_context(|| format!("failed to register partial {name}"))?;
459 }
460 engine
461 .render_template(template, context)
462 .with_context(|| "failed to render template")
463}
464
465#[cfg(feature = "mcp")]
466fn types_tenant_ctx(ctx: &FlowContext<'_>, default_env: &str) -> TypesTenantCtx {
467 tenant_context(
468 default_env,
469 ctx.tenant,
470 Some(ctx.flow_id),
471 ctx.node_id,
472 ctx.provider_id,
473 ctx.session_id,
474 )
475}
476
477#[cfg(test)]
478mod tests {
479 use super::*;
480 use serde_json::json;
481
482 #[test]
483 fn templating_renders_with_partials_and_data() {
484 let mut state = ExecutionState::new(json!({ "city": "London" }));
485 state.nodes.insert(
486 "forecast".to_string(),
487 NodeOutput::new(json!({ "temp": "20C" })),
488 );
489
490 let mut partials = HashMap::new();
491 partials.insert(
492 "line".to_string(),
493 "Weather in {{input.city}}: {{nodes.forecast.payload.temp}} {{extra.note}}".to_string(),
494 );
495
496 let payload = TemplatePayload {
497 template: "{{> line}}".to_string(),
498 partials,
499 data: json!({ "extra": { "note": "today" } }),
500 };
501
502 let mut base = Handlebars::new();
503 base.set_strict_mode(false);
504
505 let mut context = state.context();
506 let data = resolve_template_value(&base, &payload.data, &context).unwrap();
507 merge_values(&mut context, data);
508 let rendered =
509 render_template(&base, &payload.template, &payload.partials, &context).unwrap();
510
511 assert_eq!(rendered, "Weather in London: 20C today");
512 }
513}
514
515use tracing::Instrument;
516
517pub struct FlowContext<'a> {
518 pub tenant: &'a str,
519 pub flow_id: &'a str,
520 pub node_id: Option<&'a str>,
521 pub tool: Option<&'a str>,
522 pub action: Option<&'a str>,
523 pub session_id: Option<&'a str>,
524 pub provider_id: Option<&'a str>,
525 pub retry_config: RetryConfig,
526 pub observer: Option<&'a dyn ExecutionObserver>,
527 pub mocks: Option<&'a MockLayer>,
528}
529
530#[derive(Copy, Clone)]
531pub struct RetryConfig {
532 pub max_attempts: u32,
533 pub base_delay_ms: u64,
534}
535
536fn should_retry(err: &anyhow::Error) -> bool {
537 let lower = err.to_string().to_lowercase();
538 lower.contains("transient") || lower.contains("unavailable") || lower.contains("internal")
539}
540
541impl From<McpRetryConfig> for RetryConfig {
542 fn from(value: McpRetryConfig) -> Self {
543 Self {
544 max_attempts: value.max_attempts.max(1),
545 base_delay_ms: value.base_delay_ms.max(50),
546 }
547 }
548}