greentic_runner_host/runner/
engine.rs

1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::sync::Arc;
5use std::time::Duration;
6
7#[cfg(feature = "mcp")]
8use anyhow::anyhow;
9use anyhow::{Context, Result, bail};
10use greentic_flow::ir::{FlowIR, NodeIR};
11#[cfg(feature = "mcp")]
12use greentic_mcp::{ExecConfig, ExecRequest};
13#[cfg(feature = "mcp")]
14use greentic_types::TenantCtx as TypesTenantCtx;
15use handlebars::Handlebars;
16use parking_lot::RwLock;
17use serde::Deserialize;
18use serde_json::{Map as JsonMap, Value, json};
19use tokio::task;
20
21use super::mocks::MockLayer;
22use crate::config::{HostConfig, McpRetryConfig};
23use crate::pack::{FlowDescriptor, PackRuntime};
24#[cfg(feature = "mcp")]
25use crate::telemetry::tenant_context;
26use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
27
28pub struct FlowEngine {
29    pack: Arc<PackRuntime>,
30    flows: Vec<FlowDescriptor>,
31    flow_ir: RwLock<HashMap<String, FlowIR>>,
32    #[cfg(feature = "mcp")]
33    exec_config: ExecConfig,
34    template_engine: Arc<Handlebars<'static>>,
35    default_env: String,
36}
37
38impl FlowEngine {
39    pub async fn new(pack: Arc<PackRuntime>, config: Arc<HostConfig>) -> Result<Self> {
40        #[cfg(not(feature = "mcp"))]
41        let _ = &config;
42        let flows = pack.list_flows().await?;
43        for flow in &flows {
44            tracing::info!(flow_id = %flow.id, flow_type = %flow.flow_type, "registered flow");
45        }
46
47        #[cfg(feature = "mcp")]
48        let exec_config = config
49            .mcp_exec_config()
50            .context("failed to build MCP executor config")?;
51
52        let mut ir_map = HashMap::new();
53        for flow in &flows {
54            let pack_clone = Arc::clone(&pack);
55            let flow_id = flow.id.clone();
56            let task_flow_id = flow_id.clone();
57            match task::spawn_blocking(move || pack_clone.load_flow_ir(&task_flow_id)).await {
58                Ok(Ok(ir)) => {
59                    ir_map.insert(flow_id, ir);
60                }
61                Ok(Err(err)) => {
62                    tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
63                }
64                Err(err) => {
65                    tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
66                }
67            }
68        }
69
70        let mut handlebars = Handlebars::new();
71        handlebars.set_strict_mode(false);
72
73        Ok(Self {
74            pack,
75            flows,
76            flow_ir: RwLock::new(ir_map),
77            #[cfg(feature = "mcp")]
78            exec_config,
79            template_engine: Arc::new(handlebars),
80            default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
81        })
82    }
83
84    async fn get_or_load_flow_ir(&self, flow_id: &str) -> Result<FlowIR> {
85        if let Some(ir) = self.flow_ir.read().get(flow_id).cloned() {
86            return Ok(ir);
87        }
88
89        let pack = Arc::clone(&self.pack);
90        let flow_id_owned = flow_id.to_string();
91        let task_flow_id = flow_id_owned.clone();
92        let ir = task::spawn_blocking(move || pack.load_flow_ir(&task_flow_id))
93            .await
94            .context("failed to join flow metadata task")??;
95        self.flow_ir
96            .write()
97            .insert(flow_id_owned.clone(), ir.clone());
98        Ok(ir)
99    }
100
101    pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<Value> {
102        let span = tracing::info_span!(
103            "flow.execute",
104            tenant = tracing::field::Empty,
105            flow_id = tracing::field::Empty,
106            node_id = tracing::field::Empty,
107            tool = tracing::field::Empty,
108            action = tracing::field::Empty
109        );
110        annotate_span(
111            &span,
112            &FlowSpanAttributes {
113                tenant: ctx.tenant,
114                flow_id: ctx.flow_id,
115                node_id: ctx.node_id,
116                tool: ctx.tool,
117                action: ctx.action,
118            },
119        );
120        set_flow_context(
121            &self.default_env,
122            ctx.tenant,
123            ctx.flow_id,
124            ctx.node_id,
125            ctx.provider_id,
126            ctx.session_id,
127        );
128        let retry_config = ctx.retry_config;
129        let original_input = input;
130        async move {
131            let mut attempt = 0u32;
132            loop {
133                attempt += 1;
134                match self.execute_once(&ctx, original_input.clone()).await {
135                    Ok(value) => return Ok(value),
136                    Err(err) => {
137                        if attempt >= retry_config.max_attempts || !should_retry(&err) {
138                            return Err(err);
139                        }
140                        let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
141                        tracing::warn!(
142                            tenant = ctx.tenant,
143                            flow_id = ctx.flow_id,
144                            attempt,
145                            max_attempts = retry_config.max_attempts,
146                            delay_ms = delay,
147                            error = %err,
148                            "transient flow execution failure, backing off"
149                        );
150                        tokio::time::sleep(Duration::from_millis(delay)).await;
151                    }
152                }
153            }
154        }
155        .instrument(span)
156        .await
157    }
158
159    async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<Value> {
160        let flow_ir = self.get_or_load_flow_ir(ctx.flow_id).await?;
161
162        let mut state = ExecutionState::new(input);
163        let mut current = flow_ir
164            .start
165            .clone()
166            .or_else(|| flow_ir.nodes.keys().next().cloned())
167            .with_context(|| format!("flow {} has no start node", flow_ir.id))?;
168
169        loop {
170            let node = flow_ir
171                .nodes
172                .get(&current)
173                .with_context(|| format!("node {current} not found"))?;
174
175            let context_value = state.context();
176            let payload = resolve_template_value(
177                self.template_engine.as_ref(),
178                &node.payload_expr,
179                &context_value,
180            )?;
181            let observed_payload = payload.clone();
182            let node_id = current.clone();
183            let event = NodeEvent {
184                context: ctx,
185                node_id: &node_id,
186                node,
187                payload: &observed_payload,
188            };
189            if let Some(observer) = ctx.observer {
190                observer.on_node_start(&event);
191            }
192            let dispatch = self
193                .dispatch_node(ctx, &current, node, &state, payload)
194                .await;
195            let output = match dispatch {
196                Ok(output) => {
197                    if let Some(observer) = ctx.observer {
198                        observer.on_node_end(&event, &output.payload);
199                    }
200                    output
201                }
202                Err(err) => {
203                    if let Some(observer) = ctx.observer {
204                        observer.on_node_error(&event, err.as_ref());
205                    }
206                    return Err(err);
207                }
208            };
209
210            state.nodes.insert(current.clone(), output.clone());
211
212            let mut next = None;
213            for route in &node.routes {
214                if route.out || matches!(route.to.as_deref(), Some("out")) {
215                    return Ok(output.payload);
216                }
217                if let Some(to) = &route.to {
218                    next = Some(to.clone());
219                    break;
220                }
221            }
222
223            match next {
224                Some(n) => current = n,
225                None => return Ok(output.payload),
226            }
227        }
228    }
229
230    async fn dispatch_node(
231        &self,
232        ctx: &FlowContext<'_>,
233        _node_id: &str,
234        node: &NodeIR,
235        state: &ExecutionState,
236        payload: Value,
237    ) -> Result<NodeOutput> {
238        match node.component.as_str() {
239            "qa.process" => Ok(NodeOutput::new(payload)),
240            "mcp.exec" => self.execute_mcp(ctx, payload).await,
241            "templating.handlebars" => self.execute_template(state, payload),
242            component if component.starts_with("emit") => Ok(NodeOutput::new(payload)),
243            other => bail!("unsupported node component: {other}"),
244        }
245    }
246
247    async fn execute_mcp(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
248        #[cfg(not(feature = "mcp"))]
249        {
250            let _ = (ctx, payload);
251            bail!("crate built without `mcp` feature; mcp.exec nodes are unavailable");
252        }
253
254        #[cfg(feature = "mcp")]
255        {
256            #[derive(Deserialize)]
257            struct McpPayload {
258                component: String,
259                action: String,
260                #[serde(default)]
261                args: Value,
262            }
263
264            let payload: McpPayload =
265                serde_json::from_value(payload).context("invalid payload for mcp.exec node")?;
266
267            if let Some(mocks) = ctx.mocks
268                && let Some(result) = mocks.tool_short_circuit(&payload.component, &payload.action)
269            {
270                let value = result.map_err(|err| anyhow!(err))?;
271                return Ok(NodeOutput::new(value));
272            }
273
274            let request = ExecRequest {
275                component: payload.component,
276                action: payload.action,
277                args: payload.args,
278                tenant: Some(types_tenant_ctx(ctx, &self.default_env)),
279            };
280
281            let exec_config = self.exec_config.clone();
282            let exec_result =
283                task::spawn_blocking(move || greentic_mcp::exec(request, &exec_config))
284                    .await
285                    .context("failed to join mcp.exec")?;
286            let value = exec_result.map_err(|err| anyhow!(err))?;
287
288            Ok(NodeOutput::new(value))
289        }
290    }
291
292    fn execute_template(&self, state: &ExecutionState, payload: Value) -> Result<NodeOutput> {
293        let payload: TemplatePayload = serde_json::from_value(payload)
294            .context("invalid payload for templating.handlebars node")?;
295
296        let mut context = state.context();
297        if !payload.data.is_null() {
298            let data =
299                resolve_template_value(self.template_engine.as_ref(), &payload.data, &context)?;
300            merge_values(&mut context, data);
301        }
302
303        let rendered = render_template(
304            self.template_engine.as_ref(),
305            &payload.template,
306            &payload.partials,
307            &context,
308        )?;
309
310        Ok(NodeOutput::new(json!({ "text": rendered })))
311    }
312
313    pub fn flows(&self) -> &[FlowDescriptor] {
314        &self.flows
315    }
316
317    pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
318        self.flows
319            .iter()
320            .find(|descriptor| descriptor.flow_type == flow_type)
321    }
322
323    pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
324        self.flows
325            .iter()
326            .find(|descriptor| descriptor.id == flow_id)
327    }
328}
329
330pub trait ExecutionObserver: Send + Sync {
331    fn on_node_start(&self, event: &NodeEvent<'_>);
332    fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
333    fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
334}
335
336pub struct NodeEvent<'a> {
337    pub context: &'a FlowContext<'a>,
338    pub node_id: &'a str,
339    pub node: &'a NodeIR,
340    pub payload: &'a Value,
341}
342
343struct ExecutionState {
344    input: Value,
345    nodes: HashMap<String, NodeOutput>,
346}
347
348impl ExecutionState {
349    fn new(input: Value) -> Self {
350        Self {
351            input,
352            nodes: HashMap::new(),
353        }
354    }
355
356    fn context(&self) -> Value {
357        let mut nodes = JsonMap::new();
358        for (id, output) in &self.nodes {
359            nodes.insert(
360                id.clone(),
361                json!({
362                    "ok": output.ok,
363                    "payload": output.payload.clone(),
364                    "meta": output.meta.clone(),
365                }),
366            );
367        }
368        json!({
369            "input": self.input.clone(),
370            "nodes": nodes,
371        })
372    }
373}
374
375#[derive(Clone)]
376struct NodeOutput {
377    ok: bool,
378    payload: Value,
379    meta: Value,
380}
381
382impl NodeOutput {
383    fn new(payload: Value) -> Self {
384        Self {
385            ok: true,
386            payload,
387            meta: Value::Null,
388        }
389    }
390}
391
392#[derive(Deserialize)]
393struct TemplatePayload {
394    template: String,
395    #[serde(default)]
396    partials: HashMap<String, String>,
397    #[serde(default)]
398    data: Value,
399}
400
401fn resolve_template_value(
402    engine: &Handlebars<'static>,
403    value: &Value,
404    context: &Value,
405) -> Result<Value> {
406    match value {
407        Value::String(s) => {
408            if s.contains("{{") {
409                let rendered = engine
410                    .render_template(s, context)
411                    .with_context(|| format!("failed to render template: {s}"))?;
412                Ok(Value::String(rendered))
413            } else {
414                Ok(Value::String(s.clone()))
415            }
416        }
417        Value::Array(items) => {
418            let values = items
419                .iter()
420                .map(|v| resolve_template_value(engine, v, context))
421                .collect::<Result<Vec<_>>>()?;
422            Ok(Value::Array(values))
423        }
424        Value::Object(map) => {
425            let mut resolved = JsonMap::new();
426            for (key, v) in map {
427                resolved.insert(key.clone(), resolve_template_value(engine, v, context)?);
428            }
429            Ok(Value::Object(resolved))
430        }
431        other => Ok(other.clone()),
432    }
433}
434
435fn merge_values(target: &mut Value, addition: Value) {
436    match (target, addition) {
437        (Value::Object(target_map), Value::Object(add_map)) => {
438            for (key, value) in add_map {
439                merge_values(target_map.entry(key).or_insert(Value::Null), value);
440            }
441        }
442        (slot, value) => {
443            *slot = value;
444        }
445    }
446}
447
448fn render_template(
449    base: &Handlebars<'static>,
450    template: &str,
451    partials: &HashMap<String, String>,
452    context: &Value,
453) -> Result<String> {
454    let mut engine = base.clone();
455    for (name, body) in partials {
456        engine
457            .register_template_string(name, body)
458            .with_context(|| format!("failed to register partial {name}"))?;
459    }
460    engine
461        .render_template(template, context)
462        .with_context(|| "failed to render template")
463}
464
465#[cfg(feature = "mcp")]
466fn types_tenant_ctx(ctx: &FlowContext<'_>, default_env: &str) -> TypesTenantCtx {
467    tenant_context(
468        default_env,
469        ctx.tenant,
470        Some(ctx.flow_id),
471        ctx.node_id,
472        ctx.provider_id,
473        ctx.session_id,
474    )
475}
476
477#[cfg(test)]
478mod tests {
479    use super::*;
480    use serde_json::json;
481
482    #[test]
483    fn templating_renders_with_partials_and_data() {
484        let mut state = ExecutionState::new(json!({ "city": "London" }));
485        state.nodes.insert(
486            "forecast".to_string(),
487            NodeOutput::new(json!({ "temp": "20C" })),
488        );
489
490        let mut partials = HashMap::new();
491        partials.insert(
492            "line".to_string(),
493            "Weather in {{input.city}}: {{nodes.forecast.payload.temp}} {{extra.note}}".to_string(),
494        );
495
496        let payload = TemplatePayload {
497            template: "{{> line}}".to_string(),
498            partials,
499            data: json!({ "extra": { "note": "today" } }),
500        };
501
502        let mut base = Handlebars::new();
503        base.set_strict_mode(false);
504
505        let mut context = state.context();
506        let data = resolve_template_value(&base, &payload.data, &context).unwrap();
507        merge_values(&mut context, data);
508        let rendered =
509            render_template(&base, &payload.template, &payload.partials, &context).unwrap();
510
511        assert_eq!(rendered, "Weather in London: 20C today");
512    }
513}
514
515use tracing::Instrument;
516
517pub struct FlowContext<'a> {
518    pub tenant: &'a str,
519    pub flow_id: &'a str,
520    pub node_id: Option<&'a str>,
521    pub tool: Option<&'a str>,
522    pub action: Option<&'a str>,
523    pub session_id: Option<&'a str>,
524    pub provider_id: Option<&'a str>,
525    pub retry_config: RetryConfig,
526    pub observer: Option<&'a dyn ExecutionObserver>,
527    pub mocks: Option<&'a MockLayer>,
528}
529
530#[derive(Copy, Clone)]
531pub struct RetryConfig {
532    pub max_attempts: u32,
533    pub base_delay_ms: u64,
534}
535
536fn should_retry(err: &anyhow::Error) -> bool {
537    let lower = err.to_string().to_lowercase();
538    lower.contains("transient") || lower.contains("unavailable") || lower.contains("internal")
539}
540
541impl From<McpRetryConfig> for RetryConfig {
542    fn from(value: McpRetryConfig) -> Self {
543        Self {
544            max_attempts: value.max_attempts.max(1),
545            base_delay_ms: value.base_delay_ms.max(50),
546        }
547    }
548}