greentic_runner_host/runner/
engine.rs

1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::sync::Arc;
5use std::time::Duration;
6
7use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
8use anyhow::{Context, Result, anyhow, bail};
9use greentic_flow::ir::{FlowIR, NodeIR};
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use serde_json::{Map as JsonMap, Value, json};
13use tokio::task;
14
15use super::mocks::MockLayer;
16use crate::config::{FlowRetryConfig, HostConfig};
17use crate::pack::{FlowDescriptor, PackRuntime};
18use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
19
20pub struct FlowEngine {
21    packs: Vec<Arc<PackRuntime>>,
22    flows: Vec<FlowDescriptor>,
23    flow_sources: HashMap<String, usize>,
24    flow_ir: RwLock<HashMap<String, FlowIR>>,
25    default_env: String,
26}
27
28#[derive(Clone, Debug, Serialize, Deserialize)]
29pub struct FlowSnapshot {
30    pub flow_id: String,
31    pub next_node: String,
32    pub state: ExecutionState,
33}
34
35#[derive(Clone, Debug)]
36pub struct FlowWait {
37    pub reason: Option<String>,
38    pub snapshot: FlowSnapshot,
39}
40
41#[derive(Clone, Debug)]
42pub enum FlowStatus {
43    Completed,
44    Waiting(FlowWait),
45}
46
47#[derive(Clone, Debug)]
48pub struct FlowExecution {
49    pub output: Value,
50    pub status: FlowStatus,
51}
52
53impl FlowExecution {
54    fn completed(output: Value) -> Self {
55        Self {
56            output,
57            status: FlowStatus::Completed,
58        }
59    }
60
61    fn waiting(output: Value, wait: FlowWait) -> Self {
62        Self {
63            output,
64            status: FlowStatus::Waiting(wait),
65        }
66    }
67}
68
69impl FlowEngine {
70    pub async fn new(packs: Vec<Arc<PackRuntime>>, _config: Arc<HostConfig>) -> Result<Self> {
71        let mut flow_sources = HashMap::new();
72        let mut descriptors = Vec::new();
73        for (idx, pack) in packs.iter().enumerate() {
74            let flows = pack.list_flows().await?;
75            for flow in flows {
76                tracing::info!(
77                    flow_id = %flow.id,
78                    flow_type = %flow.flow_type,
79                    pack_index = idx,
80                    "registered flow"
81                );
82                flow_sources.insert(flow.id.clone(), idx);
83                descriptors.retain(|existing: &FlowDescriptor| existing.id != flow.id);
84                descriptors.push(flow);
85            }
86        }
87
88        let mut ir_map = HashMap::new();
89        for flow in &descriptors {
90            if let Some(&pack_idx) = flow_sources.get(&flow.id) {
91                let pack_clone = Arc::clone(&packs[pack_idx]);
92                let flow_id = flow.id.clone();
93                let task_flow_id = flow_id.clone();
94                match task::spawn_blocking(move || pack_clone.load_flow_ir(&task_flow_id)).await {
95                    Ok(Ok(ir)) => {
96                        ir_map.insert(flow_id, ir);
97                    }
98                    Ok(Err(err)) => {
99                        tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
100                    }
101                    Err(err) => {
102                        tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
103                    }
104                }
105            }
106        }
107
108        Ok(Self {
109            packs,
110            flows: descriptors,
111            flow_sources,
112            flow_ir: RwLock::new(ir_map),
113            default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
114        })
115    }
116
117    async fn get_or_load_flow_ir(&self, flow_id: &str) -> Result<FlowIR> {
118        if let Some(ir) = self.flow_ir.read().get(flow_id).cloned() {
119            return Ok(ir);
120        }
121
122        let pack_idx = *self
123            .flow_sources
124            .get(flow_id)
125            .with_context(|| format!("flow {flow_id} not registered"))?;
126        let pack = Arc::clone(&self.packs[pack_idx]);
127        let flow_id_owned = flow_id.to_string();
128        let task_flow_id = flow_id_owned.clone();
129        let ir = task::spawn_blocking(move || pack.load_flow_ir(&task_flow_id))
130            .await
131            .context("failed to join flow metadata task")??;
132        self.flow_ir
133            .write()
134            .insert(flow_id_owned.clone(), ir.clone());
135        Ok(ir)
136    }
137
138    pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
139        let span = tracing::info_span!(
140            "flow.execute",
141            tenant = tracing::field::Empty,
142            flow_id = tracing::field::Empty,
143            node_id = tracing::field::Empty,
144            tool = tracing::field::Empty,
145            action = tracing::field::Empty
146        );
147        annotate_span(
148            &span,
149            &FlowSpanAttributes {
150                tenant: ctx.tenant,
151                flow_id: ctx.flow_id,
152                node_id: ctx.node_id,
153                tool: ctx.tool,
154                action: ctx.action,
155            },
156        );
157        set_flow_context(
158            &self.default_env,
159            ctx.tenant,
160            ctx.flow_id,
161            ctx.node_id,
162            ctx.provider_id,
163            ctx.session_id,
164        );
165        let retry_config = ctx.retry_config;
166        let original_input = input;
167        async move {
168            let mut attempt = 0u32;
169            loop {
170                attempt += 1;
171                match self.execute_once(&ctx, original_input.clone()).await {
172                    Ok(value) => return Ok(value),
173                    Err(err) => {
174                        if attempt >= retry_config.max_attempts || !should_retry(&err) {
175                            return Err(err);
176                        }
177                        let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
178                        tracing::warn!(
179                            tenant = ctx.tenant,
180                            flow_id = ctx.flow_id,
181                            attempt,
182                            max_attempts = retry_config.max_attempts,
183                            delay_ms = delay,
184                            error = %err,
185                            "transient flow execution failure, backing off"
186                        );
187                        tokio::time::sleep(Duration::from_millis(delay)).await;
188                    }
189                }
190            }
191        }
192        .instrument(span)
193        .await
194    }
195
196    pub async fn resume(
197        &self,
198        ctx: FlowContext<'_>,
199        snapshot: FlowSnapshot,
200        input: Value,
201    ) -> Result<FlowExecution> {
202        if snapshot.flow_id != ctx.flow_id {
203            bail!(
204                "snapshot flow {} does not match requested {}",
205                snapshot.flow_id,
206                ctx.flow_id
207            );
208        }
209        let flow_ir = self.get_or_load_flow_ir(ctx.flow_id).await?;
210        let mut state = snapshot.state;
211        state.replace_input(input);
212        self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
213            .await
214    }
215
216    async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
217        let flow_ir = self.get_or_load_flow_ir(ctx.flow_id).await?;
218        let state = ExecutionState::new(input);
219        self.drive_flow(ctx, flow_ir, state, None).await
220    }
221
222    async fn drive_flow(
223        &self,
224        ctx: &FlowContext<'_>,
225        flow_ir: FlowIR,
226        mut state: ExecutionState,
227        resume_from: Option<String>,
228    ) -> Result<FlowExecution> {
229        let mut current = flow_ir
230            .start
231            .clone()
232            .or_else(|| flow_ir.nodes.keys().next().cloned())
233            .with_context(|| format!("flow {} has no start node", flow_ir.id))?;
234        if let Some(resume) = resume_from {
235            current = resume;
236        }
237        let mut final_payload = None;
238
239        loop {
240            let node = flow_ir
241                .nodes
242                .get(&current)
243                .with_context(|| format!("node {current} not found"))?;
244
245            let payload = node.payload_expr.clone();
246            let observed_payload = payload.clone();
247            let node_id = current.clone();
248            let event = NodeEvent {
249                context: ctx,
250                node_id: &node_id,
251                node,
252                payload: &observed_payload,
253            };
254            if let Some(observer) = ctx.observer {
255                observer.on_node_start(&event);
256            }
257            let DispatchOutcome {
258                output,
259                wait_reason,
260            } = self
261                .dispatch_node(ctx, &current, node, &mut state, payload)
262                .await?;
263
264            state.nodes.insert(current.clone(), output.clone());
265
266            let mut next = None;
267            let mut should_exit = false;
268            for route in &node.routes {
269                if route.out || matches!(route.to.as_deref(), Some("out")) {
270                    final_payload = Some(output.payload.clone());
271                    should_exit = true;
272                    break;
273                }
274                if let Some(to) = &route.to {
275                    next = Some(to.clone());
276                    break;
277                }
278            }
279
280            if let Some(wait_reason) = wait_reason {
281                let resume_target = next.clone().ok_or_else(|| {
282                    anyhow!("session.wait node {current} requires a non-empty route")
283                })?;
284                let mut snapshot_state = state.clone();
285                snapshot_state.clear_egress();
286                let snapshot = FlowSnapshot {
287                    flow_id: ctx.flow_id.to_string(),
288                    next_node: resume_target,
289                    state: snapshot_state,
290                };
291                let output_value = state.clone().finalize_with(None);
292                return Ok(FlowExecution::waiting(
293                    output_value,
294                    FlowWait {
295                        reason: Some(wait_reason),
296                        snapshot,
297                    },
298                ));
299            }
300
301            if should_exit {
302                break;
303            }
304
305            match next {
306                Some(n) => current = n,
307                None => {
308                    final_payload = Some(output.payload.clone());
309                    break;
310                }
311            }
312        }
313
314        let payload = final_payload.unwrap_or(Value::Null);
315        Ok(FlowExecution::completed(state.finalize_with(Some(payload))))
316    }
317
318    async fn dispatch_node(
319        &self,
320        ctx: &FlowContext<'_>,
321        node_id: &str,
322        node: &NodeIR,
323        state: &mut ExecutionState,
324        payload: Value,
325    ) -> Result<DispatchOutcome> {
326        match node.component.as_str() {
327            "component.exec" => self
328                .execute_component_exec(ctx, node_id, state, payload)
329                .await
330                .map(DispatchOutcome::complete),
331            "flow.call" => self
332                .execute_flow_call(ctx, payload)
333                .await
334                .map(DispatchOutcome::complete),
335            component if component.starts_with("emit") => {
336                state.push_egress(payload.clone());
337                Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
338            }
339            "session.wait" => {
340                let reason = extract_wait_reason(&payload);
341                Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
342            }
343            other => bail!("unsupported node component: {other}"),
344        }
345    }
346
347    async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
348        #[derive(Deserialize)]
349        struct FlowCallPayload {
350            #[serde(alias = "flow")]
351            flow_id: String,
352            #[serde(default)]
353            input: Value,
354        }
355
356        let call: FlowCallPayload =
357            serde_json::from_value(payload).context("invalid payload for flow.call node")?;
358        if call.flow_id.trim().is_empty() {
359            bail!("flow.call requires a non-empty flow_id");
360        }
361
362        let sub_input = if call.input.is_null() {
363            Value::Null
364        } else {
365            call.input
366        };
367
368        let flow_id_owned = call.flow_id;
369        let action = "flow.call";
370        let sub_ctx = FlowContext {
371            tenant: ctx.tenant,
372            flow_id: flow_id_owned.as_str(),
373            node_id: None,
374            tool: ctx.tool,
375            action: Some(action),
376            session_id: ctx.session_id,
377            provider_id: ctx.provider_id,
378            retry_config: ctx.retry_config,
379            observer: ctx.observer,
380            mocks: ctx.mocks,
381        };
382
383        let execution = Box::pin(self.execute(sub_ctx, sub_input))
384            .await
385            .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
386        match execution.status {
387            FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
388            FlowStatus::Waiting(wait) => bail!(
389                "flow.call cannot pause (flow {} waiting {:?})",
390                flow_id_owned,
391                wait.reason
392            ),
393        }
394    }
395
396    async fn execute_component_exec(
397        &self,
398        ctx: &FlowContext<'_>,
399        node_id: &str,
400        state: &ExecutionState,
401        payload: Value,
402    ) -> Result<NodeOutput> {
403        #[derive(Deserialize)]
404        struct ComponentPayload {
405            #[serde(default, alias = "component_ref", alias = "component")]
406            component: Option<String>,
407            #[serde(alias = "op")]
408            operation: Option<String>,
409            #[serde(default)]
410            input: Value,
411            #[serde(default)]
412            config: Value,
413        }
414
415        let payload: ComponentPayload =
416            serde_json::from_value(payload).context("invalid payload for component.exec")?;
417        let component_ref = payload
418            .component
419            .filter(|v| !v.trim().is_empty())
420            .with_context(|| "component.exec requires a component_ref")?;
421        let operation = payload
422            .operation
423            .filter(|v| !v.trim().is_empty())
424            .with_context(|| "component.exec requires an operation")?;
425        let mut input = payload.input;
426        if let Value::Object(mut map) = input {
427            map.entry("state".to_string())
428                .or_insert_with(|| state.context());
429            input = Value::Object(map);
430        }
431        let input_json = serde_json::to_string(&input)?;
432        let config_json = if payload.config.is_null() {
433            None
434        } else {
435            Some(serde_json::to_string(&payload.config)?)
436        };
437
438        let pack_idx = *self
439            .flow_sources
440            .get(ctx.flow_id)
441            .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
442        let pack = Arc::clone(&self.packs[pack_idx]);
443        let exec_ctx = component_exec_ctx(ctx, node_id);
444        let value = pack
445            .invoke_component(
446                &component_ref,
447                exec_ctx,
448                &operation,
449                config_json,
450                input_json,
451            )
452            .await?;
453
454        Ok(NodeOutput::new(value))
455    }
456
457    pub fn flows(&self) -> &[FlowDescriptor] {
458        &self.flows
459    }
460
461    pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
462        self.flows
463            .iter()
464            .find(|descriptor| descriptor.flow_type == flow_type)
465    }
466
467    pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
468        self.flows
469            .iter()
470            .find(|descriptor| descriptor.id == flow_id)
471    }
472}
473
474pub trait ExecutionObserver: Send + Sync {
475    fn on_node_start(&self, event: &NodeEvent<'_>);
476    fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
477    fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
478}
479
480pub struct NodeEvent<'a> {
481    pub context: &'a FlowContext<'a>,
482    pub node_id: &'a str,
483    pub node: &'a NodeIR,
484    pub payload: &'a Value,
485}
486
487#[derive(Clone, Debug, Serialize, Deserialize)]
488pub struct ExecutionState {
489    input: Value,
490    nodes: HashMap<String, NodeOutput>,
491    egress: Vec<Value>,
492}
493
494impl ExecutionState {
495    fn new(input: Value) -> Self {
496        Self {
497            input,
498            nodes: HashMap::new(),
499            egress: Vec::new(),
500        }
501    }
502
503    fn context(&self) -> Value {
504        let mut nodes = JsonMap::new();
505        for (id, output) in &self.nodes {
506            nodes.insert(
507                id.clone(),
508                json!({
509                    "ok": output.ok,
510                    "payload": output.payload.clone(),
511                    "meta": output.meta.clone(),
512                }),
513            );
514        }
515        json!({
516            "input": self.input.clone(),
517            "nodes": nodes,
518        })
519    }
520    fn push_egress(&mut self, payload: Value) {
521        self.egress.push(payload);
522    }
523
524    fn replace_input(&mut self, input: Value) {
525        self.input = input;
526    }
527
528    fn clear_egress(&mut self) {
529        self.egress.clear();
530    }
531
532    fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
533        if self.egress.is_empty() {
534            return final_payload.unwrap_or(Value::Null);
535        }
536        let mut emitted = std::mem::take(&mut self.egress);
537        if let Some(value) = final_payload {
538            match value {
539                Value::Null => {}
540                Value::Array(items) => emitted.extend(items),
541                other => emitted.push(other),
542            }
543        }
544        Value::Array(emitted)
545    }
546}
547
548#[derive(Clone, Debug, Serialize, Deserialize)]
549struct NodeOutput {
550    ok: bool,
551    payload: Value,
552    meta: Value,
553}
554
555impl NodeOutput {
556    fn new(payload: Value) -> Self {
557        Self {
558            ok: true,
559            payload,
560            meta: Value::Null,
561        }
562    }
563}
564
565struct DispatchOutcome {
566    output: NodeOutput,
567    wait_reason: Option<String>,
568}
569
570impl DispatchOutcome {
571    fn complete(output: NodeOutput) -> Self {
572        Self {
573            output,
574            wait_reason: None,
575        }
576    }
577
578    fn wait(output: NodeOutput, reason: Option<String>) -> Self {
579        Self {
580            output,
581            wait_reason: reason,
582        }
583    }
584}
585
586fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
587    ComponentExecCtx {
588        tenant: ComponentTenantCtx {
589            tenant: ctx.tenant.to_string(),
590            team: None,
591            user: ctx.provider_id.map(str::to_string),
592            trace_id: None,
593            correlation_id: ctx.session_id.map(str::to_string),
594            deadline_unix_ms: None,
595            attempt: 1,
596            idempotency_key: ctx.session_id.map(str::to_string),
597        },
598        flow_id: ctx.flow_id.to_string(),
599        node_id: Some(node_id.to_string()),
600    }
601}
602
603fn extract_wait_reason(payload: &Value) -> Option<String> {
604    match payload {
605        Value::String(s) => Some(s.clone()),
606        Value::Object(map) => map
607            .get("reason")
608            .and_then(Value::as_str)
609            .map(|value| value.to_string()),
610        _ => None,
611    }
612}
613
614#[cfg(test)]
615mod tests {
616    use super::*;
617    use serde_json::json;
618
619    #[test]
620    fn templating_renders_with_partials_and_data() {
621        let mut state = ExecutionState::new(json!({ "city": "London" }));
622        state.nodes.insert(
623            "forecast".to_string(),
624            NodeOutput::new(json!({ "temp": "20C" })),
625        );
626
627        // templating handled via component now; ensure context still includes node outputs
628        let ctx = state.context();
629        assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
630    }
631
632    #[test]
633    fn finalize_wraps_emitted_payloads() {
634        let mut state = ExecutionState::new(json!({}));
635        state.push_egress(json!({ "text": "first" }));
636        state.push_egress(json!({ "text": "second" }));
637        let result = state.finalize_with(Some(json!({ "text": "final" })));
638        assert_eq!(
639            result,
640            json!([
641                { "text": "first" },
642                { "text": "second" },
643                { "text": "final" }
644            ])
645        );
646    }
647
648    #[test]
649    fn finalize_flattens_final_array() {
650        let mut state = ExecutionState::new(json!({}));
651        state.push_egress(json!({ "text": "only" }));
652        let result = state.finalize_with(Some(json!([
653            { "text": "extra-1" },
654            { "text": "extra-2" }
655        ])));
656        assert_eq!(
657            result,
658            json!([
659                { "text": "only" },
660                { "text": "extra-1" },
661                { "text": "extra-2" }
662            ])
663        );
664    }
665}
666
667use tracing::Instrument;
668
669pub struct FlowContext<'a> {
670    pub tenant: &'a str,
671    pub flow_id: &'a str,
672    pub node_id: Option<&'a str>,
673    pub tool: Option<&'a str>,
674    pub action: Option<&'a str>,
675    pub session_id: Option<&'a str>,
676    pub provider_id: Option<&'a str>,
677    pub retry_config: RetryConfig,
678    pub observer: Option<&'a dyn ExecutionObserver>,
679    pub mocks: Option<&'a MockLayer>,
680}
681
682#[derive(Copy, Clone)]
683pub struct RetryConfig {
684    pub max_attempts: u32,
685    pub base_delay_ms: u64,
686}
687
688fn should_retry(err: &anyhow::Error) -> bool {
689    let lower = err.to_string().to_lowercase();
690    lower.contains("transient") || lower.contains("unavailable") || lower.contains("internal")
691}
692
693impl From<FlowRetryConfig> for RetryConfig {
694    fn from(value: FlowRetryConfig) -> Self {
695        Self {
696            max_attempts: value.max_attempts.max(1),
697            base_delay_ms: value.base_delay_ms.max(50),
698        }
699    }
700}