Skip to main content

greentic_runner_host/runner/
engine.rs

1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
9use anyhow::{Context, Result, anyhow, bail};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use super::templating::{TemplateOptions, render_template_value};
18use crate::config::{FlowRetryConfig, HostConfig};
19use crate::pack::{FlowDescriptor, PackRuntime};
20use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
21use greentic_types::{Flow, Node, NodeId, Routing};
22
23pub struct FlowEngine {
24    packs: Vec<Arc<PackRuntime>>,
25    flows: Vec<FlowDescriptor>,
26    flow_sources: HashMap<String, usize>,
27    flow_cache: RwLock<HashMap<String, HostFlow>>,
28    default_env: String,
29}
30
31#[derive(Clone, Debug, Serialize, Deserialize)]
32pub struct FlowSnapshot {
33    pub flow_id: String,
34    pub next_node: String,
35    pub state: ExecutionState,
36}
37
38#[derive(Clone, Debug)]
39pub struct FlowWait {
40    pub reason: Option<String>,
41    pub snapshot: FlowSnapshot,
42}
43
44#[derive(Clone, Debug)]
45pub enum FlowStatus {
46    Completed,
47    Waiting(Box<FlowWait>),
48}
49
50#[derive(Clone, Debug)]
51pub struct FlowExecution {
52    pub output: Value,
53    pub status: FlowStatus,
54}
55
56#[derive(Clone, Debug)]
57struct HostFlow {
58    id: String,
59    start: Option<NodeId>,
60    nodes: IndexMap<NodeId, HostNode>,
61}
62
63#[derive(Clone, Debug)]
64pub struct HostNode {
65    kind: NodeKind,
66    /// Backwards-compatible component label for observers/transcript.
67    pub component: String,
68    component_id: String,
69    operation_name: Option<String>,
70    operation_in_mapping: Option<String>,
71    payload_expr: Value,
72    routing: Routing,
73}
74
75#[derive(Clone, Debug)]
76enum NodeKind {
77    Exec { target_component: String },
78    PackComponent { component_ref: String },
79    ProviderInvoke,
80    FlowCall,
81    BuiltinEmit { kind: EmitKind },
82    Wait,
83}
84
85#[derive(Clone, Debug)]
86enum EmitKind {
87    Log,
88    Response,
89    Other(String),
90}
91
92struct ComponentOverrides<'a> {
93    component: Option<&'a str>,
94    operation: Option<&'a str>,
95}
96
97struct ComponentCall {
98    component_ref: String,
99    operation: String,
100    input: Value,
101    config: Value,
102}
103
104impl FlowExecution {
105    fn completed(output: Value) -> Self {
106        Self {
107            output,
108            status: FlowStatus::Completed,
109        }
110    }
111
112    fn waiting(output: Value, wait: FlowWait) -> Self {
113        Self {
114            output,
115            status: FlowStatus::Waiting(Box::new(wait)),
116        }
117    }
118}
119
120impl FlowEngine {
121    pub async fn new(packs: Vec<Arc<PackRuntime>>, _config: Arc<HostConfig>) -> Result<Self> {
122        let mut flow_sources = HashMap::new();
123        let mut descriptors = Vec::new();
124        for (idx, pack) in packs.iter().enumerate() {
125            let flows = pack.list_flows().await?;
126            for flow in flows {
127                tracing::info!(
128                    flow_id = %flow.id,
129                    flow_type = %flow.flow_type,
130                    pack_index = idx,
131                    "registered flow"
132                );
133                flow_sources.insert(flow.id.clone(), idx);
134                descriptors.retain(|existing: &FlowDescriptor| existing.id != flow.id);
135                descriptors.push(flow);
136            }
137        }
138
139        let mut flow_map = HashMap::new();
140        for flow in &descriptors {
141            if let Some(&pack_idx) = flow_sources.get(&flow.id) {
142                let pack_clone = Arc::clone(&packs[pack_idx]);
143                let flow_id = flow.id.clone();
144                let task_flow_id = flow_id.clone();
145                match task::spawn_blocking(move || pack_clone.load_flow(&task_flow_id)).await {
146                    Ok(Ok(flow)) => {
147                        flow_map.insert(flow_id, HostFlow::from(flow));
148                    }
149                    Ok(Err(err)) => {
150                        tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
151                    }
152                    Err(err) => {
153                        tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
154                    }
155                }
156            }
157        }
158
159        Ok(Self {
160            packs,
161            flows: descriptors,
162            flow_sources,
163            flow_cache: RwLock::new(flow_map),
164            default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
165        })
166    }
167
168    async fn get_or_load_flow(&self, flow_id: &str) -> Result<HostFlow> {
169        if let Some(flow) = self.flow_cache.read().get(flow_id).cloned() {
170            return Ok(flow);
171        }
172
173        let pack_idx = *self
174            .flow_sources
175            .get(flow_id)
176            .with_context(|| format!("flow {flow_id} not registered"))?;
177        let pack = Arc::clone(&self.packs[pack_idx]);
178        let flow_id_owned = flow_id.to_string();
179        let task_flow_id = flow_id_owned.clone();
180        let flow = task::spawn_blocking(move || pack.load_flow(&task_flow_id))
181            .await
182            .context("failed to join flow metadata task")??;
183        let host_flow = HostFlow::from(flow);
184        self.flow_cache
185            .write()
186            .insert(flow_id_owned.clone(), host_flow.clone());
187        Ok(host_flow)
188    }
189
190    pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
191        let span = tracing::info_span!(
192            "flow.execute",
193            tenant = tracing::field::Empty,
194            flow_id = tracing::field::Empty,
195            node_id = tracing::field::Empty,
196            tool = tracing::field::Empty,
197            action = tracing::field::Empty
198        );
199        annotate_span(
200            &span,
201            &FlowSpanAttributes {
202                tenant: ctx.tenant,
203                flow_id: ctx.flow_id,
204                node_id: ctx.node_id,
205                tool: ctx.tool,
206                action: ctx.action,
207            },
208        );
209        set_flow_context(
210            &self.default_env,
211            ctx.tenant,
212            ctx.flow_id,
213            ctx.node_id,
214            ctx.provider_id,
215            ctx.session_id,
216        );
217        let retry_config = ctx.retry_config;
218        let original_input = input;
219        async move {
220            let mut attempt = 0u32;
221            loop {
222                attempt += 1;
223                match self.execute_once(&ctx, original_input.clone()).await {
224                    Ok(value) => return Ok(value),
225                    Err(err) => {
226                        if attempt >= retry_config.max_attempts || !should_retry(&err) {
227                            return Err(err);
228                        }
229                        let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
230                        tracing::warn!(
231                            tenant = ctx.tenant,
232                            flow_id = ctx.flow_id,
233                            attempt,
234                            max_attempts = retry_config.max_attempts,
235                            delay_ms = delay,
236                            error = %err,
237                            "transient flow execution failure, backing off"
238                        );
239                        tokio::time::sleep(Duration::from_millis(delay)).await;
240                    }
241                }
242            }
243        }
244        .instrument(span)
245        .await
246    }
247
248    pub async fn resume(
249        &self,
250        ctx: FlowContext<'_>,
251        snapshot: FlowSnapshot,
252        input: Value,
253    ) -> Result<FlowExecution> {
254        if snapshot.flow_id != ctx.flow_id {
255            bail!(
256                "snapshot flow {} does not match requested {}",
257                snapshot.flow_id,
258                ctx.flow_id
259            );
260        }
261        let flow_ir = self.get_or_load_flow(ctx.flow_id).await?;
262        let mut state = snapshot.state;
263        state.replace_input(input);
264        state.ensure_entry();
265        self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
266            .await
267    }
268
269    async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
270        let flow_ir = self.get_or_load_flow(ctx.flow_id).await?;
271        let state = ExecutionState::new(input);
272        self.drive_flow(ctx, flow_ir, state, None).await
273    }
274
275    async fn drive_flow(
276        &self,
277        ctx: &FlowContext<'_>,
278        flow_ir: HostFlow,
279        mut state: ExecutionState,
280        resume_from: Option<String>,
281    ) -> Result<FlowExecution> {
282        let mut current = match resume_from {
283            Some(node) => NodeId::from_str(&node)
284                .with_context(|| format!("invalid resume node id `{node}`"))?,
285            None => flow_ir
286                .start
287                .clone()
288                .or_else(|| flow_ir.nodes.keys().next().cloned())
289                .with_context(|| format!("flow {} has no start node", flow_ir.id))?,
290        };
291
292        loop {
293            let node = flow_ir
294                .nodes
295                .get(&current)
296                .with_context(|| format!("node {} not found", current.as_str()))?;
297
298            let payload_template = node.payload_expr.clone();
299            let prev = state
300                .last_output
301                .as_ref()
302                .cloned()
303                .unwrap_or_else(|| Value::Object(JsonMap::new()));
304            let ctx_value = template_context(&state, prev);
305            let payload =
306                render_template_value(&payload_template, &ctx_value, TemplateOptions::default())
307                    .context("failed to render node input template")?;
308            let observed_payload = payload.clone();
309            let node_id = current.clone();
310            let event = NodeEvent {
311                context: ctx,
312                node_id: node_id.as_str(),
313                node,
314                payload: &observed_payload,
315            };
316            if let Some(observer) = ctx.observer {
317                observer.on_node_start(&event);
318            }
319            let DispatchOutcome {
320                output,
321                wait_reason,
322            } = self
323                .dispatch_node(ctx, node_id.as_str(), node, &mut state, payload)
324                .await?;
325
326            state.nodes.insert(node_id.clone().into(), output.clone());
327            state.last_output = Some(output.payload.clone());
328            if let Some(observer) = ctx.observer {
329                observer.on_node_end(&event, &output.payload);
330            }
331
332            let (next, should_exit) = match &node.routing {
333                Routing::Next { node_id } => (Some(node_id.clone()), false),
334                Routing::End | Routing::Reply => (None, true),
335                Routing::Branch { default, .. } => (default.clone(), default.is_none()),
336                Routing::Custom(raw) => {
337                    tracing::warn!(
338                        flow_id = %flow_ir.id,
339                        node_id = %node_id,
340                        routing = ?raw,
341                        "unsupported routing; terminating flow"
342                    );
343                    (None, true)
344                }
345            };
346
347            if let Some(wait_reason) = wait_reason {
348                let resume_target = next.clone().ok_or_else(|| {
349                    anyhow!(
350                        "session.wait node {} requires a non-empty route",
351                        current.as_str()
352                    )
353                })?;
354                let mut snapshot_state = state.clone();
355                snapshot_state.clear_egress();
356                let snapshot = FlowSnapshot {
357                    flow_id: ctx.flow_id.to_string(),
358                    next_node: resume_target.as_str().to_string(),
359                    state: snapshot_state,
360                };
361                let output_value = state.clone().finalize_with(None);
362                return Ok(FlowExecution::waiting(
363                    output_value,
364                    FlowWait {
365                        reason: Some(wait_reason),
366                        snapshot,
367                    },
368                ));
369            }
370
371            if should_exit {
372                return Ok(FlowExecution::completed(
373                    state.finalize_with(Some(output.payload.clone())),
374                ));
375            }
376
377            match next {
378                Some(n) => current = n,
379                None => {
380                    return Ok(FlowExecution::completed(
381                        state.finalize_with(Some(output.payload.clone())),
382                    ));
383                }
384            }
385        }
386    }
387
388    async fn dispatch_node(
389        &self,
390        ctx: &FlowContext<'_>,
391        node_id: &str,
392        node: &HostNode,
393        state: &mut ExecutionState,
394        payload: Value,
395    ) -> Result<DispatchOutcome> {
396        match &node.kind {
397            NodeKind::Exec { target_component } => self
398                .execute_component_exec(
399                    ctx,
400                    node_id,
401                    node,
402                    payload,
403                    ComponentOverrides {
404                        component: Some(target_component.as_str()),
405                        operation: node.operation_name.as_deref(),
406                    },
407                )
408                .await
409                .map(DispatchOutcome::complete),
410            NodeKind::PackComponent { component_ref } => self
411                .execute_component_call(ctx, node_id, node, payload, component_ref.as_str())
412                .await
413                .map(DispatchOutcome::complete),
414            NodeKind::FlowCall => self
415                .execute_flow_call(ctx, payload)
416                .await
417                .map(DispatchOutcome::complete),
418            NodeKind::ProviderInvoke => self
419                .execute_provider_invoke(ctx, node_id, state, payload)
420                .await
421                .map(DispatchOutcome::complete),
422            NodeKind::BuiltinEmit { kind } => {
423                match kind {
424                    EmitKind::Log | EmitKind::Response => {}
425                    EmitKind::Other(component) => {
426                        tracing::debug!(%component, "handling emit.* as builtin");
427                    }
428                }
429                state.push_egress(payload.clone());
430                Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
431            }
432            NodeKind::Wait => {
433                let reason = extract_wait_reason(&payload);
434                Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
435            }
436        }
437    }
438
439    async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
440        #[derive(Deserialize)]
441        struct FlowCallPayload {
442            #[serde(alias = "flow")]
443            flow_id: String,
444            #[serde(default)]
445            input: Value,
446        }
447
448        let call: FlowCallPayload =
449            serde_json::from_value(payload).context("invalid payload for flow.call node")?;
450        if call.flow_id.trim().is_empty() {
451            bail!("flow.call requires a non-empty flow_id");
452        }
453
454        let sub_input = if call.input.is_null() {
455            Value::Null
456        } else {
457            call.input
458        };
459
460        let flow_id_owned = call.flow_id;
461        let action = "flow.call";
462        let sub_ctx = FlowContext {
463            tenant: ctx.tenant,
464            flow_id: flow_id_owned.as_str(),
465            node_id: None,
466            tool: ctx.tool,
467            action: Some(action),
468            session_id: ctx.session_id,
469            provider_id: ctx.provider_id,
470            retry_config: ctx.retry_config,
471            observer: ctx.observer,
472            mocks: ctx.mocks,
473        };
474
475        let execution = Box::pin(self.execute(sub_ctx, sub_input))
476            .await
477            .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
478        match execution.status {
479            FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
480            FlowStatus::Waiting(wait) => bail!(
481                "flow.call cannot pause (flow {} waiting {:?})",
482                flow_id_owned,
483                wait.reason
484            ),
485        }
486    }
487
488    async fn execute_component_exec(
489        &self,
490        ctx: &FlowContext<'_>,
491        node_id: &str,
492        node: &HostNode,
493        payload: Value,
494        overrides: ComponentOverrides<'_>,
495    ) -> Result<NodeOutput> {
496        #[derive(Deserialize)]
497        struct ComponentPayload {
498            #[serde(default, alias = "component_ref", alias = "component")]
499            component: Option<String>,
500            #[serde(alias = "op")]
501            operation: Option<String>,
502            #[serde(default)]
503            input: Value,
504            #[serde(default)]
505            config: Value,
506        }
507
508        let payload: ComponentPayload =
509            serde_json::from_value(payload).context("invalid payload for component.exec")?;
510        let component_ref = overrides
511            .component
512            .map(str::to_string)
513            .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
514            .with_context(|| "component.exec requires a component_ref")?;
515        let operation = resolve_component_operation(
516            node_id,
517            node.component_id.as_str(),
518            payload.operation,
519            overrides.operation,
520            node.operation_in_mapping.as_deref(),
521        )?;
522        let call = ComponentCall {
523            component_ref,
524            operation,
525            input: payload.input,
526            config: payload.config,
527        };
528
529        self.invoke_component_call(ctx, node_id, call).await
530    }
531
532    async fn execute_component_call(
533        &self,
534        ctx: &FlowContext<'_>,
535        node_id: &str,
536        node: &HostNode,
537        payload: Value,
538        component_ref: &str,
539    ) -> Result<NodeOutput> {
540        let payload_operation = extract_operation_from_mapping(&payload);
541        let (input, config) = split_operation_payload(payload);
542        let operation = resolve_component_operation(
543            node_id,
544            node.component_id.as_str(),
545            payload_operation,
546            node.operation_name.as_deref(),
547            node.operation_in_mapping.as_deref(),
548        )?;
549        let call = ComponentCall {
550            component_ref: component_ref.to_string(),
551            operation,
552            input,
553            config,
554        };
555        self.invoke_component_call(ctx, node_id, call).await
556    }
557
558    async fn invoke_component_call(
559        &self,
560        ctx: &FlowContext<'_>,
561        node_id: &str,
562        call: ComponentCall,
563    ) -> Result<NodeOutput> {
564        let input_json = serde_json::to_string(&call.input)?;
565        let config_json = if call.config.is_null() {
566            None
567        } else {
568            Some(serde_json::to_string(&call.config)?)
569        };
570
571        let pack_idx = *self
572            .flow_sources
573            .get(ctx.flow_id)
574            .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
575        let pack = Arc::clone(&self.packs[pack_idx]);
576        let exec_ctx = component_exec_ctx(ctx, node_id);
577        let value = pack
578            .invoke_component(
579                call.component_ref.as_str(),
580                exec_ctx,
581                call.operation.as_str(),
582                config_json,
583                input_json,
584            )
585            .await?;
586
587        Ok(NodeOutput::new(value))
588    }
589
590    async fn execute_provider_invoke(
591        &self,
592        ctx: &FlowContext<'_>,
593        node_id: &str,
594        state: &ExecutionState,
595        payload: Value,
596    ) -> Result<NodeOutput> {
597        #[derive(Deserialize)]
598        struct ProviderPayload {
599            #[serde(default)]
600            provider_id: Option<String>,
601            #[serde(default)]
602            provider_type: Option<String>,
603            #[serde(default, alias = "operation")]
604            op: Option<String>,
605            #[serde(default)]
606            input: Value,
607            #[serde(default)]
608            in_map: Value,
609            #[serde(default)]
610            out_map: Value,
611            #[serde(default)]
612            err_map: Value,
613        }
614
615        let payload: ProviderPayload =
616            serde_json::from_value(payload).context("invalid payload for provider.invoke")?;
617        let op = payload
618            .op
619            .as_deref()
620            .filter(|v| !v.trim().is_empty())
621            .with_context(|| "provider.invoke requires an op")?
622            .to_string();
623
624        let prev = state
625            .last_output
626            .as_ref()
627            .cloned()
628            .unwrap_or_else(|| Value::Object(JsonMap::new()));
629        let base_ctx = template_context(state, prev);
630
631        let input_value = if !payload.in_map.is_null() {
632            let mut ctx_value = base_ctx.clone();
633            if let Value::Object(ref mut map) = ctx_value {
634                map.insert("input".into(), payload.input.clone());
635                map.insert("result".into(), payload.input.clone());
636            }
637            render_template_value(
638                &payload.in_map,
639                &ctx_value,
640                TemplateOptions {
641                    allow_pointer: true,
642                },
643            )
644            .context("failed to render provider.invoke in_map")?
645        } else if !payload.input.is_null() {
646            payload.input
647        } else {
648            Value::Null
649        };
650        let input_json = serde_json::to_vec(&input_value)?;
651
652        let pack_idx = *self
653            .flow_sources
654            .get(ctx.flow_id)
655            .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
656        let pack = Arc::clone(&self.packs[pack_idx]);
657        let binding = pack.resolve_provider(
658            payload.provider_id.as_deref(),
659            payload.provider_type.as_deref(),
660        )?;
661        let exec_ctx = component_exec_ctx(ctx, node_id);
662        let result = pack
663            .invoke_provider(&binding, exec_ctx, &op, input_json)
664            .await?;
665
666        let output = if payload.out_map.is_null() {
667            result
668        } else {
669            let mut ctx_value = base_ctx;
670            if let Value::Object(ref mut map) = ctx_value {
671                map.insert("input".into(), result.clone());
672                map.insert("result".into(), result.clone());
673            }
674            render_template_value(
675                &payload.out_map,
676                &ctx_value,
677                TemplateOptions {
678                    allow_pointer: true,
679                },
680            )
681            .context("failed to render provider.invoke out_map")?
682        };
683        let _ = payload.err_map;
684        Ok(NodeOutput::new(output))
685    }
686
687    pub fn flows(&self) -> &[FlowDescriptor] {
688        &self.flows
689    }
690
691    pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
692        self.flows
693            .iter()
694            .find(|descriptor| descriptor.flow_type == flow_type)
695    }
696
697    pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
698        self.flows
699            .iter()
700            .find(|descriptor| descriptor.id == flow_id)
701    }
702}
703
704pub trait ExecutionObserver: Send + Sync {
705    fn on_node_start(&self, event: &NodeEvent<'_>);
706    fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
707    fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
708}
709
710pub struct NodeEvent<'a> {
711    pub context: &'a FlowContext<'a>,
712    pub node_id: &'a str,
713    pub node: &'a HostNode,
714    pub payload: &'a Value,
715}
716
717#[derive(Clone, Debug, Serialize, Deserialize)]
718pub struct ExecutionState {
719    #[serde(default)]
720    entry: Value,
721    #[serde(default)]
722    input: Value,
723    #[serde(default)]
724    nodes: HashMap<String, NodeOutput>,
725    #[serde(default)]
726    egress: Vec<Value>,
727    #[serde(default, skip_serializing_if = "Option::is_none")]
728    last_output: Option<Value>,
729}
730
731impl ExecutionState {
732    fn new(input: Value) -> Self {
733        Self {
734            entry: input.clone(),
735            input,
736            nodes: HashMap::new(),
737            egress: Vec::new(),
738            last_output: None,
739        }
740    }
741
742    fn ensure_entry(&mut self) {
743        if self.entry.is_null() {
744            self.entry = self.input.clone();
745        }
746    }
747
748    fn context(&self) -> Value {
749        let mut nodes = JsonMap::new();
750        for (id, output) in &self.nodes {
751            nodes.insert(
752                id.clone(),
753                json!({
754                    "ok": output.ok,
755                    "payload": output.payload.clone(),
756                    "meta": output.meta.clone(),
757                }),
758            );
759        }
760        json!({
761            "entry": self.entry.clone(),
762            "input": self.input.clone(),
763            "nodes": nodes,
764        })
765    }
766
767    fn outputs_map(&self) -> JsonMap<String, Value> {
768        let mut outputs = JsonMap::new();
769        for (id, output) in &self.nodes {
770            outputs.insert(id.clone(), output.payload.clone());
771        }
772        outputs
773    }
774    fn push_egress(&mut self, payload: Value) {
775        self.egress.push(payload);
776    }
777
778    fn replace_input(&mut self, input: Value) {
779        self.input = input;
780    }
781
782    fn clear_egress(&mut self) {
783        self.egress.clear();
784    }
785
786    fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
787        if self.egress.is_empty() {
788            return final_payload.unwrap_or(Value::Null);
789        }
790        let mut emitted = std::mem::take(&mut self.egress);
791        if let Some(value) = final_payload {
792            match value {
793                Value::Null => {}
794                Value::Array(items) => emitted.extend(items),
795                other => emitted.push(other),
796            }
797        }
798        Value::Array(emitted)
799    }
800}
801
802#[derive(Clone, Debug, Serialize, Deserialize)]
803struct NodeOutput {
804    ok: bool,
805    payload: Value,
806    meta: Value,
807}
808
809impl NodeOutput {
810    fn new(payload: Value) -> Self {
811        Self {
812            ok: true,
813            payload,
814            meta: Value::Null,
815        }
816    }
817}
818
819struct DispatchOutcome {
820    output: NodeOutput,
821    wait_reason: Option<String>,
822}
823
824impl DispatchOutcome {
825    fn complete(output: NodeOutput) -> Self {
826        Self {
827            output,
828            wait_reason: None,
829        }
830    }
831
832    fn wait(output: NodeOutput, reason: Option<String>) -> Self {
833        Self {
834            output,
835            wait_reason: reason,
836        }
837    }
838}
839
840fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
841    ComponentExecCtx {
842        tenant: ComponentTenantCtx {
843            tenant: ctx.tenant.to_string(),
844            team: None,
845            user: ctx.provider_id.map(str::to_string),
846            trace_id: None,
847            correlation_id: ctx.session_id.map(str::to_string),
848            deadline_unix_ms: None,
849            attempt: 1,
850            idempotency_key: ctx.session_id.map(str::to_string),
851        },
852        flow_id: ctx.flow_id.to_string(),
853        node_id: Some(node_id.to_string()),
854    }
855}
856
857fn extract_wait_reason(payload: &Value) -> Option<String> {
858    match payload {
859        Value::String(s) => Some(s.clone()),
860        Value::Object(map) => map
861            .get("reason")
862            .and_then(Value::as_str)
863            .map(|value| value.to_string()),
864        _ => None,
865    }
866}
867
868fn template_context(state: &ExecutionState, prev: Value) -> Value {
869    let entry = if state.entry.is_null() {
870        Value::Object(JsonMap::new())
871    } else {
872        state.entry.clone()
873    };
874    let mut ctx = JsonMap::new();
875    ctx.insert("entry".into(), entry);
876    ctx.insert("prev".into(), prev);
877    ctx.insert("node".into(), Value::Object(state.outputs_map()));
878    ctx.insert("state".into(), state.context());
879    Value::Object(ctx)
880}
881
882impl From<Flow> for HostFlow {
883    fn from(value: Flow) -> Self {
884        let mut nodes = IndexMap::new();
885        for (id, node) in value.nodes {
886            nodes.insert(id.clone(), HostNode::from(node));
887        }
888        let start = value
889            .entrypoints
890            .get("default")
891            .and_then(Value::as_str)
892            .and_then(|id| NodeId::from_str(id).ok())
893            .or_else(|| nodes.keys().next().cloned());
894        Self {
895            id: value.id.as_str().to_string(),
896            start,
897            nodes,
898        }
899    }
900}
901
902impl From<Node> for HostNode {
903    fn from(node: Node) -> Self {
904        let component_ref = node.component.id.as_str().to_string();
905        let raw_operation = node.component.operation.clone();
906        let operation_in_mapping = extract_operation_from_mapping(&node.input.mapping);
907        let operation_is_component_exec = raw_operation.as_deref() == Some("component.exec");
908        let operation_is_emit = raw_operation
909            .as_deref()
910            .map(|op| op.starts_with("emit."))
911            .unwrap_or(false);
912        let is_component_exec = component_ref == "component.exec" || operation_is_component_exec;
913
914        let kind = if is_component_exec {
915            let target = if component_ref == "component.exec" {
916                if let Some(op) = raw_operation
917                    .as_deref()
918                    .filter(|op| op.starts_with("emit."))
919                {
920                    op.to_string()
921                } else {
922                    extract_target_component(&node.input.mapping)
923                        .unwrap_or_else(|| "component.exec".to_string())
924                }
925            } else {
926                extract_target_component(&node.input.mapping)
927                    .unwrap_or_else(|| component_ref.clone())
928            };
929            if target.starts_with("emit.") {
930                NodeKind::BuiltinEmit {
931                    kind: emit_kind_from_ref(&target),
932                }
933            } else {
934                NodeKind::Exec {
935                    target_component: target,
936                }
937            }
938        } else if operation_is_emit {
939            NodeKind::BuiltinEmit {
940                kind: emit_kind_from_ref(raw_operation.as_deref().unwrap_or("emit.log")),
941            }
942        } else {
943            match component_ref.as_str() {
944                "flow.call" => NodeKind::FlowCall,
945                "provider.invoke" => NodeKind::ProviderInvoke,
946                "session.wait" => NodeKind::Wait,
947                comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
948                    kind: emit_kind_from_ref(comp),
949                },
950                other => NodeKind::PackComponent {
951                    component_ref: other.to_string(),
952                },
953            }
954        };
955        let component_label = match &kind {
956            NodeKind::Exec { .. } => "component.exec".to_string(),
957            NodeKind::PackComponent { component_ref } => component_ref.clone(),
958            NodeKind::ProviderInvoke => "provider.invoke".to_string(),
959            NodeKind::FlowCall => "flow.call".to_string(),
960            NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
961            NodeKind::Wait => "session.wait".to_string(),
962        };
963        let operation_name = if is_component_exec && operation_is_component_exec {
964            None
965        } else {
966            raw_operation.clone()
967        };
968        let payload_expr = match kind {
969            NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.input.mapping),
970            _ => node.input.mapping.clone(),
971        };
972        Self {
973            kind,
974            component: component_label,
975            component_id: if is_component_exec {
976                "component.exec".to_string()
977            } else {
978                component_ref
979            },
980            operation_name,
981            operation_in_mapping,
982            payload_expr,
983            routing: node.routing,
984        }
985    }
986}
987
988fn extract_target_component(payload: &Value) -> Option<String> {
989    match payload {
990        Value::Object(map) => map
991            .get("component")
992            .or_else(|| map.get("component_ref"))
993            .and_then(Value::as_str)
994            .map(|s| s.to_string()),
995        _ => None,
996    }
997}
998
999fn extract_operation_from_mapping(payload: &Value) -> Option<String> {
1000    match payload {
1001        Value::Object(map) => map
1002            .get("operation")
1003            .or_else(|| map.get("op"))
1004            .and_then(Value::as_str)
1005            .map(str::trim)
1006            .filter(|value| !value.is_empty())
1007            .map(|value| value.to_string()),
1008        _ => None,
1009    }
1010}
1011
1012fn extract_emit_payload(payload: &Value) -> Value {
1013    if let Value::Object(map) = payload {
1014        if let Some(input) = map.get("input") {
1015            return input.clone();
1016        }
1017        if let Some(inner) = map.get("payload") {
1018            return inner.clone();
1019        }
1020    }
1021    payload.clone()
1022}
1023
1024fn split_operation_payload(payload: Value) -> (Value, Value) {
1025    if let Value::Object(mut map) = payload.clone()
1026        && map.contains_key("input")
1027    {
1028        let input = map.remove("input").unwrap_or(Value::Null);
1029        let config = map.remove("config").unwrap_or(Value::Null);
1030        let legacy_only = map.keys().all(|key| {
1031            matches!(
1032                key.as_str(),
1033                "operation" | "op" | "component" | "component_ref"
1034            )
1035        });
1036        if legacy_only {
1037            return (input, config);
1038        }
1039    }
1040    (payload, Value::Null)
1041}
1042
1043fn resolve_component_operation(
1044    node_id: &str,
1045    component_label: &str,
1046    payload_operation: Option<String>,
1047    operation_override: Option<&str>,
1048    operation_in_mapping: Option<&str>,
1049) -> Result<String> {
1050    if let Some(op) = operation_override
1051        .map(str::trim)
1052        .filter(|value| !value.is_empty())
1053    {
1054        return Ok(op.to_string());
1055    }
1056
1057    if let Some(op) = payload_operation
1058        .as_deref()
1059        .map(str::trim)
1060        .filter(|value| !value.is_empty())
1061    {
1062        return Ok(op.to_string());
1063    }
1064
1065    let mut message = format!(
1066        "missing operation for node `{}` (component `{}`); expected node.component.operation to be set",
1067        node_id, component_label,
1068    );
1069    if let Some(found) = operation_in_mapping {
1070        message.push_str(&format!(
1071            ". Found operation in input.mapping (`{}`) but this is not used; pack compiler must preserve node.component.operation.",
1072            found
1073        ));
1074    }
1075    bail!(message);
1076}
1077
1078fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
1079    match component_ref {
1080        "emit.log" => EmitKind::Log,
1081        "emit.response" => EmitKind::Response,
1082        other => EmitKind::Other(other.to_string()),
1083    }
1084}
1085
1086fn emit_ref_from_kind(kind: &EmitKind) -> String {
1087    match kind {
1088        EmitKind::Log => "emit.log".to_string(),
1089        EmitKind::Response => "emit.response".to_string(),
1090        EmitKind::Other(other) => other.clone(),
1091    }
1092}
1093
1094#[cfg(test)]
1095mod tests {
1096    use super::*;
1097    use greentic_types::{
1098        Flow, FlowComponentRef, FlowId, FlowKind, InputMapping, Node, NodeId, OutputMapping,
1099        Routing, TelemetryHints,
1100    };
1101    use serde_json::json;
1102    use std::collections::BTreeMap;
1103    use std::str::FromStr;
1104    use std::sync::Mutex;
1105    use tokio::runtime::Runtime;
1106
1107    fn minimal_engine() -> FlowEngine {
1108        FlowEngine {
1109            packs: Vec::new(),
1110            flows: Vec::new(),
1111            flow_sources: HashMap::new(),
1112            flow_cache: RwLock::new(HashMap::new()),
1113            default_env: "local".to_string(),
1114        }
1115    }
1116
1117    #[test]
1118    fn templating_renders_with_partials_and_data() {
1119        let mut state = ExecutionState::new(json!({ "city": "London" }));
1120        state.nodes.insert(
1121            "forecast".to_string(),
1122            NodeOutput::new(json!({ "temp": "20C" })),
1123        );
1124
1125        // templating context includes node outputs for runner-side payload rendering.
1126        let ctx = state.context();
1127        assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
1128    }
1129
1130    #[test]
1131    fn finalize_wraps_emitted_payloads() {
1132        let mut state = ExecutionState::new(json!({}));
1133        state.push_egress(json!({ "text": "first" }));
1134        state.push_egress(json!({ "text": "second" }));
1135        let result = state.finalize_with(Some(json!({ "text": "final" })));
1136        assert_eq!(
1137            result,
1138            json!([
1139                { "text": "first" },
1140                { "text": "second" },
1141                { "text": "final" }
1142            ])
1143        );
1144    }
1145
1146    #[test]
1147    fn finalize_flattens_final_array() {
1148        let mut state = ExecutionState::new(json!({}));
1149        state.push_egress(json!({ "text": "only" }));
1150        let result = state.finalize_with(Some(json!([
1151            { "text": "extra-1" },
1152            { "text": "extra-2" }
1153        ])));
1154        assert_eq!(
1155            result,
1156            json!([
1157                { "text": "only" },
1158                { "text": "extra-1" },
1159                { "text": "extra-2" }
1160            ])
1161        );
1162    }
1163
1164    #[test]
1165    fn missing_operation_reports_node_and_component() {
1166        let engine = minimal_engine();
1167        let rt = Runtime::new().unwrap();
1168        let retry_config = RetryConfig {
1169            max_attempts: 1,
1170            base_delay_ms: 1,
1171        };
1172        let ctx = FlowContext {
1173            tenant: "tenant",
1174            flow_id: "flow",
1175            node_id: Some("missing-op"),
1176            tool: None,
1177            action: None,
1178            session_id: None,
1179            provider_id: None,
1180            retry_config,
1181            observer: None,
1182            mocks: None,
1183        };
1184        let node = HostNode {
1185            kind: NodeKind::Exec {
1186                target_component: "qa.process".into(),
1187            },
1188            component: "component.exec".into(),
1189            component_id: "component.exec".into(),
1190            operation_name: None,
1191            operation_in_mapping: None,
1192            payload_expr: Value::Null,
1193            routing: Routing::End,
1194        };
1195        let _state = ExecutionState::new(Value::Null);
1196        let payload = json!({ "component": "qa.process" });
1197        let err = rt
1198            .block_on(engine.execute_component_exec(
1199                &ctx,
1200                "missing-op",
1201                &node,
1202                payload,
1203                ComponentOverrides {
1204                    component: None,
1205                    operation: None,
1206                },
1207            ))
1208            .unwrap_err();
1209        let message = err.to_string();
1210        assert!(
1211            message.contains("missing operation for node `missing-op`"),
1212            "unexpected message: {message}"
1213        );
1214        assert!(
1215            message.contains("(component `component.exec`)"),
1216            "unexpected message: {message}"
1217        );
1218    }
1219
1220    #[test]
1221    fn missing_operation_mentions_mapping_hint() {
1222        let engine = minimal_engine();
1223        let rt = Runtime::new().unwrap();
1224        let retry_config = RetryConfig {
1225            max_attempts: 1,
1226            base_delay_ms: 1,
1227        };
1228        let ctx = FlowContext {
1229            tenant: "tenant",
1230            flow_id: "flow",
1231            node_id: Some("missing-op-hint"),
1232            tool: None,
1233            action: None,
1234            session_id: None,
1235            provider_id: None,
1236            retry_config,
1237            observer: None,
1238            mocks: None,
1239        };
1240        let node = HostNode {
1241            kind: NodeKind::Exec {
1242                target_component: "qa.process".into(),
1243            },
1244            component: "component.exec".into(),
1245            component_id: "component.exec".into(),
1246            operation_name: None,
1247            operation_in_mapping: Some("render".into()),
1248            payload_expr: Value::Null,
1249            routing: Routing::End,
1250        };
1251        let _state = ExecutionState::new(Value::Null);
1252        let payload = json!({ "component": "qa.process" });
1253        let err = rt
1254            .block_on(engine.execute_component_exec(
1255                &ctx,
1256                "missing-op-hint",
1257                &node,
1258                payload,
1259                ComponentOverrides {
1260                    component: None,
1261                    operation: None,
1262                },
1263            ))
1264            .unwrap_err();
1265        let message = err.to_string();
1266        assert!(
1267            message.contains("missing operation for node `missing-op-hint`"),
1268            "unexpected message: {message}"
1269        );
1270        assert!(
1271            message.contains("Found operation in input.mapping (`render`)"),
1272            "unexpected message: {message}"
1273        );
1274    }
1275
1276    struct CountingObserver {
1277        starts: Mutex<Vec<String>>,
1278        ends: Mutex<Vec<Value>>,
1279    }
1280
1281    impl CountingObserver {
1282        fn new() -> Self {
1283            Self {
1284                starts: Mutex::new(Vec::new()),
1285                ends: Mutex::new(Vec::new()),
1286            }
1287        }
1288    }
1289
1290    impl ExecutionObserver for CountingObserver {
1291        fn on_node_start(&self, event: &NodeEvent<'_>) {
1292            self.starts.lock().unwrap().push(event.node_id.to_string());
1293        }
1294
1295        fn on_node_end(&self, _event: &NodeEvent<'_>, output: &Value) {
1296            self.ends.lock().unwrap().push(output.clone());
1297        }
1298
1299        fn on_node_error(&self, _event: &NodeEvent<'_>, _error: &dyn StdError) {}
1300    }
1301
1302    #[test]
1303    fn emits_end_event_for_successful_node() {
1304        let node_id = NodeId::from_str("emit").unwrap();
1305        let node = Node {
1306            id: node_id.clone(),
1307            component: FlowComponentRef {
1308                id: "emit.log".parse().unwrap(),
1309                pack_alias: None,
1310                operation: None,
1311            },
1312            input: InputMapping {
1313                mapping: json!({ "message": "logged" }),
1314            },
1315            output: OutputMapping {
1316                mapping: Value::Null,
1317            },
1318            routing: Routing::End,
1319            telemetry: TelemetryHints::default(),
1320        };
1321        let mut nodes = indexmap::IndexMap::default();
1322        nodes.insert(node_id.clone(), node);
1323        let flow = Flow {
1324            schema_version: "1.0".into(),
1325            id: FlowId::from_str("emit.flow").unwrap(),
1326            kind: FlowKind::Messaging,
1327            entrypoints: BTreeMap::from([(
1328                "default".to_string(),
1329                Value::String(node_id.to_string()),
1330            )]),
1331            nodes,
1332            metadata: Default::default(),
1333        };
1334        let host_flow = HostFlow::from(flow);
1335
1336        let engine = FlowEngine {
1337            packs: Vec::new(),
1338            flows: Vec::new(),
1339            flow_sources: HashMap::new(),
1340            flow_cache: RwLock::new(HashMap::from([("emit.flow".to_string(), host_flow)])),
1341            default_env: "local".to_string(),
1342        };
1343        let observer = CountingObserver::new();
1344        let ctx = FlowContext {
1345            tenant: "demo",
1346            flow_id: "emit.flow",
1347            node_id: None,
1348            tool: None,
1349            action: None,
1350            session_id: None,
1351            provider_id: None,
1352            retry_config: RetryConfig {
1353                max_attempts: 1,
1354                base_delay_ms: 1,
1355            },
1356            observer: Some(&observer),
1357            mocks: None,
1358        };
1359
1360        let rt = Runtime::new().unwrap();
1361        let result = rt.block_on(engine.execute(ctx, Value::Null)).unwrap();
1362        assert!(matches!(result.status, FlowStatus::Completed));
1363
1364        let starts = observer.starts.lock().unwrap();
1365        let ends = observer.ends.lock().unwrap();
1366        assert_eq!(starts.len(), 1);
1367        assert_eq!(ends.len(), 1);
1368        assert_eq!(ends[0], json!({ "message": "logged" }));
1369    }
1370}
1371
1372use tracing::Instrument;
1373
1374pub struct FlowContext<'a> {
1375    pub tenant: &'a str,
1376    pub flow_id: &'a str,
1377    pub node_id: Option<&'a str>,
1378    pub tool: Option<&'a str>,
1379    pub action: Option<&'a str>,
1380    pub session_id: Option<&'a str>,
1381    pub provider_id: Option<&'a str>,
1382    pub retry_config: RetryConfig,
1383    pub observer: Option<&'a dyn ExecutionObserver>,
1384    pub mocks: Option<&'a MockLayer>,
1385}
1386
1387#[derive(Copy, Clone)]
1388pub struct RetryConfig {
1389    pub max_attempts: u32,
1390    pub base_delay_ms: u64,
1391}
1392
1393fn should_retry(err: &anyhow::Error) -> bool {
1394    let lower = err.to_string().to_lowercase();
1395    lower.contains("transient") || lower.contains("unavailable") || lower.contains("internal")
1396}
1397
1398impl From<FlowRetryConfig> for RetryConfig {
1399    fn from(value: FlowRetryConfig) -> Self {
1400        Self {
1401            max_attempts: value.max_attempts.max(1),
1402            base_delay_ms: value.base_delay_ms.max(50),
1403        }
1404    }
1405}