greentic_runner_host/runner/
engine.rs

1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::sync::Arc;
5use std::time::Duration;
6
7use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
8use anyhow::{Context, Result, anyhow, bail};
9use greentic_flow::ir::{FlowIR, NodeIR};
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use serde_json::{Map as JsonMap, Value, json};
13use tokio::task;
14
15use super::mocks::MockLayer;
16use crate::config::{FlowRetryConfig, HostConfig};
17use crate::pack::{FlowDescriptor, PackRuntime};
18use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
19
20pub struct FlowEngine {
21    packs: Vec<Arc<PackRuntime>>,
22    flows: Vec<FlowDescriptor>,
23    flow_sources: HashMap<String, usize>,
24    flow_ir: RwLock<HashMap<String, FlowIR>>,
25    default_env: String,
26}
27
28#[derive(Clone, Debug, Serialize, Deserialize)]
29pub struct FlowSnapshot {
30    pub flow_id: String,
31    pub next_node: String,
32    pub state: ExecutionState,
33}
34
35#[derive(Clone, Debug)]
36pub struct FlowWait {
37    pub reason: Option<String>,
38    pub snapshot: FlowSnapshot,
39}
40
41#[derive(Clone, Debug)]
42pub enum FlowStatus {
43    Completed,
44    Waiting(FlowWait),
45}
46
47#[derive(Clone, Debug)]
48pub struct FlowExecution {
49    pub output: Value,
50    pub status: FlowStatus,
51}
52
53impl FlowExecution {
54    fn completed(output: Value) -> Self {
55        Self {
56            output,
57            status: FlowStatus::Completed,
58        }
59    }
60
61    fn waiting(output: Value, wait: FlowWait) -> Self {
62        Self {
63            output,
64            status: FlowStatus::Waiting(wait),
65        }
66    }
67}
68
69impl FlowEngine {
70    pub async fn new(packs: Vec<Arc<PackRuntime>>, _config: Arc<HostConfig>) -> Result<Self> {
71        let mut flow_sources = HashMap::new();
72        let mut descriptors = Vec::new();
73        for (idx, pack) in packs.iter().enumerate() {
74            let flows = pack.list_flows().await?;
75            for flow in flows {
76                tracing::info!(
77                    flow_id = %flow.id,
78                    flow_type = %flow.flow_type,
79                    pack_index = idx,
80                    "registered flow"
81                );
82                flow_sources.insert(flow.id.clone(), idx);
83                descriptors.retain(|existing: &FlowDescriptor| existing.id != flow.id);
84                descriptors.push(flow);
85            }
86        }
87
88        let mut ir_map = HashMap::new();
89        for flow in &descriptors {
90            if let Some(&pack_idx) = flow_sources.get(&flow.id) {
91                let pack_clone = Arc::clone(&packs[pack_idx]);
92                let flow_id = flow.id.clone();
93                let task_flow_id = flow_id.clone();
94                match task::spawn_blocking(move || pack_clone.load_flow_ir(&task_flow_id)).await {
95                    Ok(Ok(ir)) => {
96                        ir_map.insert(flow_id, ir);
97                    }
98                    Ok(Err(err)) => {
99                        tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
100                    }
101                    Err(err) => {
102                        tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
103                    }
104                }
105            }
106        }
107
108        Ok(Self {
109            packs,
110            flows: descriptors,
111            flow_sources,
112            flow_ir: RwLock::new(ir_map),
113            default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
114        })
115    }
116
117    async fn get_or_load_flow_ir(&self, flow_id: &str) -> Result<FlowIR> {
118        if let Some(ir) = self.flow_ir.read().get(flow_id).cloned() {
119            return Ok(ir);
120        }
121
122        let pack_idx = *self
123            .flow_sources
124            .get(flow_id)
125            .with_context(|| format!("flow {flow_id} not registered"))?;
126        let pack = Arc::clone(&self.packs[pack_idx]);
127        let flow_id_owned = flow_id.to_string();
128        let task_flow_id = flow_id_owned.clone();
129        let ir = task::spawn_blocking(move || pack.load_flow_ir(&task_flow_id))
130            .await
131            .context("failed to join flow metadata task")??;
132        self.flow_ir
133            .write()
134            .insert(flow_id_owned.clone(), ir.clone());
135        Ok(ir)
136    }
137
138    pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
139        let span = tracing::info_span!(
140            "flow.execute",
141            tenant = tracing::field::Empty,
142            flow_id = tracing::field::Empty,
143            node_id = tracing::field::Empty,
144            tool = tracing::field::Empty,
145            action = tracing::field::Empty
146        );
147        annotate_span(
148            &span,
149            &FlowSpanAttributes {
150                tenant: ctx.tenant,
151                flow_id: ctx.flow_id,
152                node_id: ctx.node_id,
153                tool: ctx.tool,
154                action: ctx.action,
155            },
156        );
157        set_flow_context(
158            &self.default_env,
159            ctx.tenant,
160            ctx.flow_id,
161            ctx.node_id,
162            ctx.provider_id,
163            ctx.session_id,
164        );
165        let retry_config = ctx.retry_config;
166        let original_input = input;
167        async move {
168            let mut attempt = 0u32;
169            loop {
170                attempt += 1;
171                match self.execute_once(&ctx, original_input.clone()).await {
172                    Ok(value) => return Ok(value),
173                    Err(err) => {
174                        if attempt >= retry_config.max_attempts || !should_retry(&err) {
175                            return Err(err);
176                        }
177                        let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
178                        tracing::warn!(
179                            tenant = ctx.tenant,
180                            flow_id = ctx.flow_id,
181                            attempt,
182                            max_attempts = retry_config.max_attempts,
183                            delay_ms = delay,
184                            error = %err,
185                            "transient flow execution failure, backing off"
186                        );
187                        tokio::time::sleep(Duration::from_millis(delay)).await;
188                    }
189                }
190            }
191        }
192        .instrument(span)
193        .await
194    }
195
196    pub async fn resume(
197        &self,
198        ctx: FlowContext<'_>,
199        snapshot: FlowSnapshot,
200        input: Value,
201    ) -> Result<FlowExecution> {
202        if snapshot.flow_id != ctx.flow_id {
203            bail!(
204                "snapshot flow {} does not match requested {}",
205                snapshot.flow_id,
206                ctx.flow_id
207            );
208        }
209        let flow_ir = self.get_or_load_flow_ir(ctx.flow_id).await?;
210        let mut state = snapshot.state;
211        state.replace_input(input);
212        self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
213            .await
214    }
215
216    async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
217        let flow_ir = self.get_or_load_flow_ir(ctx.flow_id).await?;
218        let state = ExecutionState::new(input);
219        self.drive_flow(ctx, flow_ir, state, None).await
220    }
221
222    async fn drive_flow(
223        &self,
224        ctx: &FlowContext<'_>,
225        flow_ir: FlowIR,
226        mut state: ExecutionState,
227        resume_from: Option<String>,
228    ) -> Result<FlowExecution> {
229        let mut current = flow_ir
230            .start
231            .clone()
232            .or_else(|| flow_ir.nodes.keys().next().cloned())
233            .with_context(|| format!("flow {} has no start node", flow_ir.id))?;
234        if let Some(resume) = resume_from {
235            current = resume;
236        }
237        let mut final_payload = None;
238
239        loop {
240            let node = flow_ir
241                .nodes
242                .get(&current)
243                .with_context(|| format!("node {current} not found"))?;
244
245            let payload = node.payload_expr.clone();
246            let observed_payload = payload.clone();
247            let node_id = current.clone();
248            let event = NodeEvent {
249                context: ctx,
250                node_id: &node_id,
251                node,
252                payload: &observed_payload,
253            };
254            if let Some(observer) = ctx.observer {
255                observer.on_node_start(&event);
256            }
257            let DispatchOutcome {
258                output,
259                wait_reason,
260            } = self
261                .dispatch_node(ctx, &current, node, &mut state, payload)
262                .await?;
263
264            state.nodes.insert(current.clone(), output.clone());
265
266            let mut next = None;
267            let mut should_exit = false;
268            for route in &node.routes {
269                if route.out || matches!(route.to.as_deref(), Some("out")) {
270                    final_payload = Some(output.payload.clone());
271                    should_exit = true;
272                    break;
273                }
274                if let Some(to) = &route.to {
275                    next = Some(to.clone());
276                    break;
277                }
278            }
279
280            if let Some(wait_reason) = wait_reason {
281                let resume_target = next.clone().ok_or_else(|| {
282                    anyhow!("session.wait node {current} requires a non-empty route")
283                })?;
284                let mut snapshot_state = state.clone();
285                snapshot_state.clear_egress();
286                let snapshot = FlowSnapshot {
287                    flow_id: ctx.flow_id.to_string(),
288                    next_node: resume_target,
289                    state: snapshot_state,
290                };
291                let output_value = state.clone().finalize_with(None);
292                return Ok(FlowExecution::waiting(
293                    output_value,
294                    FlowWait {
295                        reason: Some(wait_reason),
296                        snapshot,
297                    },
298                ));
299            }
300
301            if should_exit {
302                break;
303            }
304
305            match next {
306                Some(n) => current = n,
307                None => {
308                    final_payload = Some(output.payload.clone());
309                    break;
310                }
311            }
312        }
313
314        let payload = final_payload.unwrap_or(Value::Null);
315        Ok(FlowExecution::completed(state.finalize_with(Some(payload))))
316    }
317
318    async fn dispatch_node(
319        &self,
320        ctx: &FlowContext<'_>,
321        node_id: &str,
322        node: &NodeIR,
323        state: &mut ExecutionState,
324        payload: Value,
325    ) -> Result<DispatchOutcome> {
326        match node.component.as_str() {
327            "component.exec" => self
328                .execute_component_exec(ctx, node_id, state, payload)
329                .await
330                .map(DispatchOutcome::complete),
331            "flow.call" => self
332                .execute_flow_call(ctx, payload)
333                .await
334                .map(DispatchOutcome::complete),
335            component if component.starts_with("emit") => {
336                state.push_egress(payload.clone());
337                Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
338            }
339            "session.wait" => {
340                let reason = extract_wait_reason(&payload);
341                Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
342            }
343            other => bail!(
344                "unsupported node component `{other}` in flow {} node {}",
345                ctx.flow_id,
346                node_id
347            ),
348        }
349    }
350
351    async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
352        #[derive(Deserialize)]
353        struct FlowCallPayload {
354            #[serde(alias = "flow")]
355            flow_id: String,
356            #[serde(default)]
357            input: Value,
358        }
359
360        let call: FlowCallPayload =
361            serde_json::from_value(payload).context("invalid payload for flow.call node")?;
362        if call.flow_id.trim().is_empty() {
363            bail!("flow.call requires a non-empty flow_id");
364        }
365
366        let sub_input = if call.input.is_null() {
367            Value::Null
368        } else {
369            call.input
370        };
371
372        let flow_id_owned = call.flow_id;
373        let action = "flow.call";
374        let sub_ctx = FlowContext {
375            tenant: ctx.tenant,
376            flow_id: flow_id_owned.as_str(),
377            node_id: None,
378            tool: ctx.tool,
379            action: Some(action),
380            session_id: ctx.session_id,
381            provider_id: ctx.provider_id,
382            retry_config: ctx.retry_config,
383            observer: ctx.observer,
384            mocks: ctx.mocks,
385        };
386
387        let execution = Box::pin(self.execute(sub_ctx, sub_input))
388            .await
389            .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
390        match execution.status {
391            FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
392            FlowStatus::Waiting(wait) => bail!(
393                "flow.call cannot pause (flow {} waiting {:?})",
394                flow_id_owned,
395                wait.reason
396            ),
397        }
398    }
399
400    async fn execute_component_exec(
401        &self,
402        ctx: &FlowContext<'_>,
403        node_id: &str,
404        state: &ExecutionState,
405        payload: Value,
406    ) -> Result<NodeOutput> {
407        #[derive(Deserialize)]
408        struct ComponentPayload {
409            #[serde(default, alias = "component_ref", alias = "component")]
410            component: Option<String>,
411            #[serde(alias = "op")]
412            operation: Option<String>,
413            #[serde(default)]
414            input: Value,
415            #[serde(default)]
416            config: Value,
417        }
418
419        let payload: ComponentPayload =
420            serde_json::from_value(payload).context("invalid payload for component.exec")?;
421        let component_ref = payload
422            .component
423            .filter(|v| !v.trim().is_empty())
424            .with_context(|| "component.exec requires a component_ref")?;
425        let operation = payload
426            .operation
427            .filter(|v| !v.trim().is_empty())
428            .with_context(|| "component.exec requires an operation")?;
429        let mut input = payload.input;
430        if let Value::Object(mut map) = input {
431            map.entry("state".to_string())
432                .or_insert_with(|| state.context());
433            input = Value::Object(map);
434        }
435        let input_json = serde_json::to_string(&input)?;
436        let config_json = if payload.config.is_null() {
437            None
438        } else {
439            Some(serde_json::to_string(&payload.config)?)
440        };
441
442        let pack_idx = *self
443            .flow_sources
444            .get(ctx.flow_id)
445            .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
446        let pack = Arc::clone(&self.packs[pack_idx]);
447        let exec_ctx = component_exec_ctx(ctx, node_id);
448        let value = pack
449            .invoke_component(
450                &component_ref,
451                exec_ctx,
452                &operation,
453                config_json,
454                input_json,
455            )
456            .await?;
457
458        Ok(NodeOutput::new(value))
459    }
460
461    pub fn flows(&self) -> &[FlowDescriptor] {
462        &self.flows
463    }
464
465    pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
466        self.flows
467            .iter()
468            .find(|descriptor| descriptor.flow_type == flow_type)
469    }
470
471    pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
472        self.flows
473            .iter()
474            .find(|descriptor| descriptor.id == flow_id)
475    }
476}
477
478pub trait ExecutionObserver: Send + Sync {
479    fn on_node_start(&self, event: &NodeEvent<'_>);
480    fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
481    fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
482}
483
484pub struct NodeEvent<'a> {
485    pub context: &'a FlowContext<'a>,
486    pub node_id: &'a str,
487    pub node: &'a NodeIR,
488    pub payload: &'a Value,
489}
490
491#[derive(Clone, Debug, Serialize, Deserialize)]
492pub struct ExecutionState {
493    input: Value,
494    nodes: HashMap<String, NodeOutput>,
495    egress: Vec<Value>,
496}
497
498impl ExecutionState {
499    fn new(input: Value) -> Self {
500        Self {
501            input,
502            nodes: HashMap::new(),
503            egress: Vec::new(),
504        }
505    }
506
507    fn context(&self) -> Value {
508        let mut nodes = JsonMap::new();
509        for (id, output) in &self.nodes {
510            nodes.insert(
511                id.clone(),
512                json!({
513                    "ok": output.ok,
514                    "payload": output.payload.clone(),
515                    "meta": output.meta.clone(),
516                }),
517            );
518        }
519        json!({
520            "input": self.input.clone(),
521            "nodes": nodes,
522        })
523    }
524    fn push_egress(&mut self, payload: Value) {
525        self.egress.push(payload);
526    }
527
528    fn replace_input(&mut self, input: Value) {
529        self.input = input;
530    }
531
532    fn clear_egress(&mut self) {
533        self.egress.clear();
534    }
535
536    fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
537        if self.egress.is_empty() {
538            return final_payload.unwrap_or(Value::Null);
539        }
540        let mut emitted = std::mem::take(&mut self.egress);
541        if let Some(value) = final_payload {
542            match value {
543                Value::Null => {}
544                Value::Array(items) => emitted.extend(items),
545                other => emitted.push(other),
546            }
547        }
548        Value::Array(emitted)
549    }
550}
551
552#[derive(Clone, Debug, Serialize, Deserialize)]
553struct NodeOutput {
554    ok: bool,
555    payload: Value,
556    meta: Value,
557}
558
559impl NodeOutput {
560    fn new(payload: Value) -> Self {
561        Self {
562            ok: true,
563            payload,
564            meta: Value::Null,
565        }
566    }
567}
568
569struct DispatchOutcome {
570    output: NodeOutput,
571    wait_reason: Option<String>,
572}
573
574impl DispatchOutcome {
575    fn complete(output: NodeOutput) -> Self {
576        Self {
577            output,
578            wait_reason: None,
579        }
580    }
581
582    fn wait(output: NodeOutput, reason: Option<String>) -> Self {
583        Self {
584            output,
585            wait_reason: reason,
586        }
587    }
588}
589
590fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
591    ComponentExecCtx {
592        tenant: ComponentTenantCtx {
593            tenant: ctx.tenant.to_string(),
594            team: None,
595            user: ctx.provider_id.map(str::to_string),
596            trace_id: None,
597            correlation_id: ctx.session_id.map(str::to_string),
598            deadline_unix_ms: None,
599            attempt: 1,
600            idempotency_key: ctx.session_id.map(str::to_string),
601        },
602        flow_id: ctx.flow_id.to_string(),
603        node_id: Some(node_id.to_string()),
604    }
605}
606
607fn extract_wait_reason(payload: &Value) -> Option<String> {
608    match payload {
609        Value::String(s) => Some(s.clone()),
610        Value::Object(map) => map
611            .get("reason")
612            .and_then(Value::as_str)
613            .map(|value| value.to_string()),
614        _ => None,
615    }
616}
617
618#[cfg(test)]
619mod tests {
620    use super::*;
621    use serde_json::json;
622
623    #[test]
624    fn templating_renders_with_partials_and_data() {
625        let mut state = ExecutionState::new(json!({ "city": "London" }));
626        state.nodes.insert(
627            "forecast".to_string(),
628            NodeOutput::new(json!({ "temp": "20C" })),
629        );
630
631        // templating handled via component now; ensure context still includes node outputs
632        let ctx = state.context();
633        assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
634    }
635
636    #[test]
637    fn finalize_wraps_emitted_payloads() {
638        let mut state = ExecutionState::new(json!({}));
639        state.push_egress(json!({ "text": "first" }));
640        state.push_egress(json!({ "text": "second" }));
641        let result = state.finalize_with(Some(json!({ "text": "final" })));
642        assert_eq!(
643            result,
644            json!([
645                { "text": "first" },
646                { "text": "second" },
647                { "text": "final" }
648            ])
649        );
650    }
651
652    #[test]
653    fn finalize_flattens_final_array() {
654        let mut state = ExecutionState::new(json!({}));
655        state.push_egress(json!({ "text": "only" }));
656        let result = state.finalize_with(Some(json!([
657            { "text": "extra-1" },
658            { "text": "extra-2" }
659        ])));
660        assert_eq!(
661            result,
662            json!([
663                { "text": "only" },
664                { "text": "extra-1" },
665                { "text": "extra-2" }
666            ])
667        );
668    }
669}
670
671use tracing::Instrument;
672
673pub struct FlowContext<'a> {
674    pub tenant: &'a str,
675    pub flow_id: &'a str,
676    pub node_id: Option<&'a str>,
677    pub tool: Option<&'a str>,
678    pub action: Option<&'a str>,
679    pub session_id: Option<&'a str>,
680    pub provider_id: Option<&'a str>,
681    pub retry_config: RetryConfig,
682    pub observer: Option<&'a dyn ExecutionObserver>,
683    pub mocks: Option<&'a MockLayer>,
684}
685
686#[derive(Copy, Clone)]
687pub struct RetryConfig {
688    pub max_attempts: u32,
689    pub base_delay_ms: u64,
690}
691
692fn should_retry(err: &anyhow::Error) -> bool {
693    let lower = err.to_string().to_lowercase();
694    lower.contains("transient") || lower.contains("unavailable") || lower.contains("internal")
695}
696
697impl From<FlowRetryConfig> for RetryConfig {
698    fn from(value: FlowRetryConfig) -> Self {
699        Self {
700            max_attempts: value.max_attempts.max(1),
701            base_delay_ms: value.base_delay_ms.max(50),
702        }
703    }
704}