Skip to main content

greentic_runner_host/runner/
engine.rs

1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
9use anyhow::{Context, Result, anyhow, bail};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use super::templating::{TemplateOptions, render_template_value};
18use crate::config::{FlowRetryConfig, HostConfig};
19use crate::pack::{FlowDescriptor, PackRuntime};
20use crate::runner::invocation::{InvocationMeta, build_invocation_envelope};
21use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
22#[cfg(feature = "fault-injection")]
23use crate::testing::fault_injection::{FaultContext, FaultPoint, maybe_fail};
24use crate::validate::{
25    ValidationConfig, ValidationIssue, ValidationMode, validate_component_envelope,
26    validate_tool_envelope,
27};
28use greentic_types::{Flow, Node, NodeId, Routing};
29
30pub struct FlowEngine {
31    packs: Vec<Arc<PackRuntime>>,
32    flows: Vec<FlowDescriptor>,
33    flow_sources: HashMap<FlowKey, usize>,
34    flow_cache: RwLock<HashMap<FlowKey, HostFlow>>,
35    default_env: String,
36    validation: ValidationConfig,
37}
38
39#[derive(Clone, Debug, PartialEq, Eq, Hash)]
40struct FlowKey {
41    pack_id: String,
42    flow_id: String,
43}
44
45#[derive(Clone, Debug, Serialize, Deserialize)]
46pub struct FlowSnapshot {
47    pub pack_id: String,
48    pub flow_id: String,
49    pub next_node: String,
50    pub state: ExecutionState,
51}
52
53#[derive(Clone, Debug)]
54pub struct FlowWait {
55    pub reason: Option<String>,
56    pub snapshot: FlowSnapshot,
57}
58
59#[derive(Clone, Debug)]
60pub enum FlowStatus {
61    Completed,
62    Waiting(Box<FlowWait>),
63}
64
65#[derive(Clone, Debug)]
66pub struct FlowExecution {
67    pub output: Value,
68    pub status: FlowStatus,
69}
70
71#[derive(Clone, Debug)]
72struct HostFlow {
73    id: String,
74    start: Option<NodeId>,
75    nodes: IndexMap<NodeId, HostNode>,
76}
77
78#[derive(Clone, Debug)]
79pub struct HostNode {
80    kind: NodeKind,
81    /// Backwards-compatible component label for observers/transcript.
82    pub component: String,
83    component_id: String,
84    operation_name: Option<String>,
85    operation_in_mapping: Option<String>,
86    payload_expr: Value,
87    routing: Routing,
88}
89
90impl HostNode {
91    pub fn component_id(&self) -> &str {
92        &self.component_id
93    }
94
95    pub fn operation_name(&self) -> Option<&str> {
96        self.operation_name.as_deref()
97    }
98
99    pub fn operation_in_mapping(&self) -> Option<&str> {
100        self.operation_in_mapping.as_deref()
101    }
102}
103
104#[derive(Clone, Debug)]
105enum NodeKind {
106    Exec { target_component: String },
107    PackComponent { component_ref: String },
108    ProviderInvoke,
109    FlowCall,
110    BuiltinEmit { kind: EmitKind },
111    Wait,
112}
113
114#[derive(Clone, Debug)]
115enum EmitKind {
116    Log,
117    Response,
118    Other(String),
119}
120
121struct ComponentOverrides<'a> {
122    component: Option<&'a str>,
123    operation: Option<&'a str>,
124}
125
126struct ComponentCall {
127    component_ref: String,
128    operation: String,
129    input: Value,
130    config: Value,
131}
132
133impl FlowExecution {
134    fn completed(output: Value) -> Self {
135        Self {
136            output,
137            status: FlowStatus::Completed,
138        }
139    }
140
141    fn waiting(output: Value, wait: FlowWait) -> Self {
142        Self {
143            output,
144            status: FlowStatus::Waiting(Box::new(wait)),
145        }
146    }
147}
148
149impl FlowEngine {
150    pub async fn new(packs: Vec<Arc<PackRuntime>>, config: Arc<HostConfig>) -> Result<Self> {
151        let mut flow_sources: HashMap<FlowKey, usize> = HashMap::new();
152        let mut descriptors = Vec::new();
153        let mut bindings = HashMap::new();
154        for pack in &config.pack_bindings {
155            bindings.insert(pack.pack_id.clone(), pack.flows.clone());
156        }
157        let enforce_bindings = !bindings.is_empty();
158        for (idx, pack) in packs.iter().enumerate() {
159            let pack_id = pack.metadata().pack_id.clone();
160            if enforce_bindings && !bindings.contains_key(&pack_id) {
161                bail!("no gtbind entries found for pack {}", pack_id);
162            }
163            let flows = pack.list_flows().await?;
164            let allowed = bindings.get(&pack_id).map(|flows| {
165                flows
166                    .iter()
167                    .cloned()
168                    .collect::<std::collections::HashSet<_>>()
169            });
170            let mut seen = std::collections::HashSet::new();
171            for flow in flows {
172                if let Some(ref allow) = allowed
173                    && !allow.contains(&flow.id)
174                {
175                    continue;
176                }
177                seen.insert(flow.id.clone());
178                tracing::info!(
179                    flow_id = %flow.id,
180                    flow_type = %flow.flow_type,
181                    pack_id = %flow.pack_id,
182                    pack_index = idx,
183                    "registered flow"
184                );
185                flow_sources.insert(
186                    FlowKey {
187                        pack_id: flow.pack_id.clone(),
188                        flow_id: flow.id.clone(),
189                    },
190                    idx,
191                );
192                descriptors.retain(|existing: &FlowDescriptor| {
193                    !(existing.id == flow.id && existing.pack_id == flow.pack_id)
194                });
195                descriptors.push(flow);
196            }
197            if let Some(allow) = allowed {
198                let missing = allow.difference(&seen).cloned().collect::<Vec<_>>();
199                if !missing.is_empty() {
200                    bail!(
201                        "gtbind flow ids missing in pack {}: {}",
202                        pack_id,
203                        missing.join(", ")
204                    );
205                }
206            }
207        }
208
209        let mut flow_map = HashMap::new();
210        for flow in &descriptors {
211            let pack_id = flow.pack_id.clone();
212            if let Some(&pack_idx) = flow_sources.get(&FlowKey {
213                pack_id: pack_id.clone(),
214                flow_id: flow.id.clone(),
215            }) {
216                let pack_clone = Arc::clone(&packs[pack_idx]);
217                let flow_id = flow.id.clone();
218                let task_flow_id = flow_id.clone();
219                match task::spawn_blocking(move || pack_clone.load_flow(&task_flow_id)).await {
220                    Ok(Ok(loaded_flow)) => {
221                        flow_map.insert(
222                            FlowKey {
223                                pack_id: pack_id.clone(),
224                                flow_id,
225                            },
226                            HostFlow::from(loaded_flow),
227                        );
228                    }
229                    Ok(Err(err)) => {
230                        tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
231                    }
232                    Err(err) => {
233                        tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
234                    }
235                }
236            }
237        }
238
239        Ok(Self {
240            packs,
241            flows: descriptors,
242            flow_sources,
243            flow_cache: RwLock::new(flow_map),
244            default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
245            validation: config.validation.clone(),
246        })
247    }
248
249    async fn get_or_load_flow(&self, pack_id: &str, flow_id: &str) -> Result<HostFlow> {
250        let key = FlowKey {
251            pack_id: pack_id.to_string(),
252            flow_id: flow_id.to_string(),
253        };
254        if let Some(flow) = self.flow_cache.read().get(&key).cloned() {
255            return Ok(flow);
256        }
257
258        let pack_idx = *self
259            .flow_sources
260            .get(&key)
261            .with_context(|| format!("flow {pack_id}:{flow_id} not registered"))?;
262        let pack = Arc::clone(&self.packs[pack_idx]);
263        let flow_id_owned = flow_id.to_string();
264        let task_flow_id = flow_id_owned.clone();
265        let flow = task::spawn_blocking(move || pack.load_flow(&task_flow_id))
266            .await
267            .context("failed to join flow metadata task")??;
268        let host_flow = HostFlow::from(flow);
269        self.flow_cache.write().insert(
270            FlowKey {
271                pack_id: pack_id.to_string(),
272                flow_id: flow_id_owned.clone(),
273            },
274            host_flow.clone(),
275        );
276        Ok(host_flow)
277    }
278
279    pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
280        let span = tracing::info_span!(
281            "flow.execute",
282            tenant = tracing::field::Empty,
283            flow_id = tracing::field::Empty,
284            node_id = tracing::field::Empty,
285            tool = tracing::field::Empty,
286            action = tracing::field::Empty
287        );
288        annotate_span(
289            &span,
290            &FlowSpanAttributes {
291                tenant: ctx.tenant,
292                flow_id: ctx.flow_id,
293                node_id: ctx.node_id,
294                tool: ctx.tool,
295                action: ctx.action,
296            },
297        );
298        set_flow_context(
299            &self.default_env,
300            ctx.tenant,
301            ctx.flow_id,
302            ctx.node_id,
303            ctx.provider_id,
304            ctx.session_id,
305        );
306        let retry_config = ctx.retry_config;
307        let original_input = input;
308        let mut ctx = ctx;
309        async move {
310            let mut attempt = 0u32;
311            loop {
312                attempt += 1;
313                ctx.attempt = attempt;
314                #[cfg(feature = "fault-injection")]
315                {
316                    let fault_ctx = FaultContext {
317                        pack_id: ctx.pack_id,
318                        flow_id: ctx.flow_id,
319                        node_id: ctx.node_id,
320                        attempt: ctx.attempt,
321                    };
322                    maybe_fail(FaultPoint::Timeout, fault_ctx)
323                        .map_err(|err| anyhow!(err.to_string()))?;
324                }
325                match self.execute_once(&ctx, original_input.clone()).await {
326                    Ok(value) => return Ok(value),
327                    Err(err) => {
328                        if attempt >= retry_config.max_attempts || !should_retry(&err) {
329                            return Err(err);
330                        }
331                        let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
332                        tracing::warn!(
333                            tenant = ctx.tenant,
334                            flow_id = ctx.flow_id,
335                            attempt,
336                            max_attempts = retry_config.max_attempts,
337                            delay_ms = delay,
338                            error = %err,
339                            "transient flow execution failure, backing off"
340                        );
341                        tokio::time::sleep(Duration::from_millis(delay)).await;
342                    }
343                }
344            }
345        }
346        .instrument(span)
347        .await
348    }
349
350    pub async fn resume(
351        &self,
352        ctx: FlowContext<'_>,
353        snapshot: FlowSnapshot,
354        input: Value,
355    ) -> Result<FlowExecution> {
356        if snapshot.pack_id != ctx.pack_id {
357            bail!(
358                "snapshot pack {} does not match requested {}",
359                snapshot.pack_id,
360                ctx.pack_id
361            );
362        }
363        if snapshot.flow_id != ctx.flow_id {
364            bail!(
365                "snapshot flow {} does not match requested {}",
366                snapshot.flow_id,
367                ctx.flow_id
368            );
369        }
370        let flow_ir = self.get_or_load_flow(ctx.pack_id, ctx.flow_id).await?;
371        let mut state = snapshot.state;
372        state.replace_input(input);
373        state.ensure_entry();
374        self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
375            .await
376    }
377
378    async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
379        let flow_ir = self.get_or_load_flow(ctx.pack_id, ctx.flow_id).await?;
380        let state = ExecutionState::new(input);
381        self.drive_flow(ctx, flow_ir, state, None).await
382    }
383
384    async fn drive_flow(
385        &self,
386        ctx: &FlowContext<'_>,
387        flow_ir: HostFlow,
388        mut state: ExecutionState,
389        resume_from: Option<String>,
390    ) -> Result<FlowExecution> {
391        let mut current = match resume_from {
392            Some(node) => NodeId::from_str(&node)
393                .with_context(|| format!("invalid resume node id `{node}`"))?,
394            None => flow_ir
395                .start
396                .clone()
397                .or_else(|| flow_ir.nodes.keys().next().cloned())
398                .with_context(|| format!("flow {} has no start node", flow_ir.id))?,
399        };
400
401        loop {
402            let node = flow_ir
403                .nodes
404                .get(&current)
405                .with_context(|| format!("node {} not found", current.as_str()))?;
406
407            let payload_template = node.payload_expr.clone();
408            let prev = state
409                .last_output
410                .as_ref()
411                .cloned()
412                .unwrap_or_else(|| Value::Object(JsonMap::new()));
413            let ctx_value = template_context(&state, prev);
414            #[cfg(feature = "fault-injection")]
415            {
416                let fault_ctx = FaultContext {
417                    pack_id: ctx.pack_id,
418                    flow_id: ctx.flow_id,
419                    node_id: Some(current.as_str()),
420                    attempt: ctx.attempt,
421                };
422                maybe_fail(FaultPoint::TemplateRender, fault_ctx)
423                    .map_err(|err| anyhow!(err.to_string()))?;
424            }
425            let payload =
426                render_template_value(&payload_template, &ctx_value, TemplateOptions::default())
427                    .context("failed to render node input template")?;
428            let observed_payload = payload.clone();
429            let node_id = current.clone();
430            let event = NodeEvent {
431                context: ctx,
432                node_id: node_id.as_str(),
433                node,
434                payload: &observed_payload,
435            };
436            if let Some(observer) = ctx.observer {
437                observer.on_node_start(&event);
438            }
439            let dispatch = self
440                .dispatch_node(ctx, node_id.as_str(), node, &mut state, payload, &event)
441                .await;
442            let DispatchOutcome {
443                output,
444                wait_reason,
445            } = match dispatch {
446                Ok(outcome) => outcome,
447                Err(err) => {
448                    if let Some(observer) = ctx.observer {
449                        observer.on_node_error(&event, err.as_ref());
450                    }
451                    return Err(err);
452                }
453            };
454
455            state.nodes.insert(node_id.clone().into(), output.clone());
456            state.last_output = Some(output.payload.clone());
457            if let Some(observer) = ctx.observer {
458                observer.on_node_end(&event, &output.payload);
459            }
460
461            let (next, should_exit) = match &node.routing {
462                Routing::Next { node_id } => (Some(node_id.clone()), false),
463                Routing::End | Routing::Reply => (None, true),
464                Routing::Branch { default, .. } => (default.clone(), default.is_none()),
465                Routing::Custom(raw) => {
466                    tracing::warn!(
467                        flow_id = %flow_ir.id,
468                        node_id = %node_id,
469                        routing = ?raw,
470                        "unsupported routing; terminating flow"
471                    );
472                    (None, true)
473                }
474            };
475
476            if let Some(wait_reason) = wait_reason {
477                let resume_target = next.clone().ok_or_else(|| {
478                    anyhow!(
479                        "session.wait node {} requires a non-empty route",
480                        current.as_str()
481                    )
482                })?;
483                let mut snapshot_state = state.clone();
484                snapshot_state.clear_egress();
485                let snapshot = FlowSnapshot {
486                    pack_id: ctx.pack_id.to_string(),
487                    flow_id: ctx.flow_id.to_string(),
488                    next_node: resume_target.as_str().to_string(),
489                    state: snapshot_state,
490                };
491                let output_value = state.clone().finalize_with(None);
492                return Ok(FlowExecution::waiting(
493                    output_value,
494                    FlowWait {
495                        reason: Some(wait_reason),
496                        snapshot,
497                    },
498                ));
499            }
500
501            if should_exit {
502                return Ok(FlowExecution::completed(
503                    state.finalize_with(Some(output.payload.clone())),
504                ));
505            }
506
507            match next {
508                Some(n) => current = n,
509                None => {
510                    return Ok(FlowExecution::completed(
511                        state.finalize_with(Some(output.payload.clone())),
512                    ));
513                }
514            }
515        }
516    }
517
518    async fn dispatch_node(
519        &self,
520        ctx: &FlowContext<'_>,
521        node_id: &str,
522        node: &HostNode,
523        state: &mut ExecutionState,
524        payload: Value,
525        event: &NodeEvent<'_>,
526    ) -> Result<DispatchOutcome> {
527        match &node.kind {
528            NodeKind::Exec { target_component } => self
529                .execute_component_exec(
530                    ctx,
531                    node_id,
532                    node,
533                    payload,
534                    event,
535                    ComponentOverrides {
536                        component: Some(target_component.as_str()),
537                        operation: node.operation_name.as_deref(),
538                    },
539                )
540                .await
541                .map(DispatchOutcome::complete),
542            NodeKind::PackComponent { component_ref } => self
543                .execute_component_call(ctx, node_id, node, payload, component_ref.as_str(), event)
544                .await
545                .map(DispatchOutcome::complete),
546            NodeKind::FlowCall => self
547                .execute_flow_call(ctx, payload)
548                .await
549                .map(DispatchOutcome::complete),
550            NodeKind::ProviderInvoke => self
551                .execute_provider_invoke(ctx, node_id, state, payload, event)
552                .await
553                .map(DispatchOutcome::complete),
554            NodeKind::BuiltinEmit { kind } => {
555                match kind {
556                    EmitKind::Log | EmitKind::Response => {}
557                    EmitKind::Other(component) => {
558                        tracing::debug!(%component, "handling emit.* as builtin");
559                    }
560                }
561                state.push_egress(payload.clone());
562                Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
563            }
564            NodeKind::Wait => {
565                let reason = extract_wait_reason(&payload);
566                Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
567            }
568        }
569    }
570
571    async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
572        #[derive(Deserialize)]
573        struct FlowCallPayload {
574            #[serde(alias = "flow")]
575            flow_id: String,
576            #[serde(default)]
577            input: Value,
578        }
579
580        let call: FlowCallPayload =
581            serde_json::from_value(payload).context("invalid payload for flow.call node")?;
582        if call.flow_id.trim().is_empty() {
583            bail!("flow.call requires a non-empty flow_id");
584        }
585
586        let sub_input = if call.input.is_null() {
587            Value::Null
588        } else {
589            call.input
590        };
591
592        let flow_id_owned = call.flow_id;
593        let action = "flow.call";
594        let sub_ctx = FlowContext {
595            tenant: ctx.tenant,
596            pack_id: ctx.pack_id,
597            flow_id: flow_id_owned.as_str(),
598            node_id: None,
599            tool: ctx.tool,
600            action: Some(action),
601            session_id: ctx.session_id,
602            provider_id: ctx.provider_id,
603            retry_config: ctx.retry_config,
604            attempt: ctx.attempt,
605            observer: ctx.observer,
606            mocks: ctx.mocks,
607        };
608
609        let execution = Box::pin(self.execute(sub_ctx, sub_input))
610            .await
611            .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
612        match execution.status {
613            FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
614            FlowStatus::Waiting(wait) => bail!(
615                "flow.call cannot pause (flow {} waiting {:?})",
616                flow_id_owned,
617                wait.reason
618            ),
619        }
620    }
621
622    async fn execute_component_exec(
623        &self,
624        ctx: &FlowContext<'_>,
625        node_id: &str,
626        node: &HostNode,
627        payload: Value,
628        event: &NodeEvent<'_>,
629        overrides: ComponentOverrides<'_>,
630    ) -> Result<NodeOutput> {
631        #[derive(Deserialize)]
632        struct ComponentPayload {
633            #[serde(default, alias = "component_ref", alias = "component")]
634            component: Option<String>,
635            #[serde(alias = "op")]
636            operation: Option<String>,
637            #[serde(default)]
638            input: Value,
639            #[serde(default)]
640            config: Value,
641        }
642
643        let payload: ComponentPayload =
644            serde_json::from_value(payload).context("invalid payload for component.exec")?;
645        let component_ref = overrides
646            .component
647            .map(str::to_string)
648            .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
649            .with_context(|| "component.exec requires a component_ref")?;
650        let operation = resolve_component_operation(
651            node_id,
652            node.component_id.as_str(),
653            payload.operation,
654            overrides.operation,
655            node.operation_in_mapping.as_deref(),
656        )?;
657        let call = ComponentCall {
658            component_ref,
659            operation,
660            input: payload.input,
661            config: payload.config,
662        };
663
664        self.invoke_component_call(ctx, node_id, call, event).await
665    }
666
667    async fn execute_component_call(
668        &self,
669        ctx: &FlowContext<'_>,
670        node_id: &str,
671        node: &HostNode,
672        payload: Value,
673        component_ref: &str,
674        event: &NodeEvent<'_>,
675    ) -> Result<NodeOutput> {
676        let payload_operation = extract_operation_from_mapping(&payload);
677        let (input, config) = split_operation_payload(payload);
678        let operation = resolve_component_operation(
679            node_id,
680            node.component_id.as_str(),
681            payload_operation,
682            node.operation_name.as_deref(),
683            node.operation_in_mapping.as_deref(),
684        )?;
685        let call = ComponentCall {
686            component_ref: component_ref.to_string(),
687            operation,
688            input,
689            config,
690        };
691        self.invoke_component_call(ctx, node_id, call, event).await
692    }
693
694    async fn invoke_component_call(
695        &self,
696        ctx: &FlowContext<'_>,
697        node_id: &str,
698        call: ComponentCall,
699        event: &NodeEvent<'_>,
700    ) -> Result<NodeOutput> {
701        self.validate_component(ctx, event, &call)?;
702        // Runtime owns ctx; flows must not embed ctx, even if they provide envelopes.
703        let meta = InvocationMeta {
704            env: &self.default_env,
705            tenant: ctx.tenant,
706            flow_id: ctx.flow_id,
707            node_id: Some(node_id),
708            provider_id: ctx.provider_id,
709            session_id: ctx.session_id,
710            attempt: ctx.attempt,
711        };
712        let invocation_envelope =
713            build_invocation_envelope(meta, call.operation.as_str(), call.input)
714                .context("build invocation envelope for component call")?;
715        let input_json = serde_json::to_string(&invocation_envelope)?;
716        let config_json = if call.config.is_null() {
717            None
718        } else {
719            Some(serde_json::to_string(&call.config)?)
720        };
721
722        let key = FlowKey {
723            pack_id: ctx.pack_id.to_string(),
724            flow_id: ctx.flow_id.to_string(),
725        };
726        let pack_idx = *self.flow_sources.get(&key).with_context(|| {
727            format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
728        })?;
729        let pack = Arc::clone(&self.packs[pack_idx]);
730        let exec_ctx = component_exec_ctx(ctx, node_id);
731        #[cfg(feature = "fault-injection")]
732        {
733            let fault_ctx = FaultContext {
734                pack_id: ctx.pack_id,
735                flow_id: ctx.flow_id,
736                node_id: Some(node_id),
737                attempt: ctx.attempt,
738            };
739            maybe_fail(FaultPoint::BeforeComponentCall, fault_ctx)
740                .map_err(|err| anyhow!(err.to_string()))?;
741        }
742        let value = pack
743            .invoke_component(
744                call.component_ref.as_str(),
745                exec_ctx,
746                call.operation.as_str(),
747                config_json,
748                input_json,
749            )
750            .await?;
751        #[cfg(feature = "fault-injection")]
752        {
753            let fault_ctx = FaultContext {
754                pack_id: ctx.pack_id,
755                flow_id: ctx.flow_id,
756                node_id: Some(node_id),
757                attempt: ctx.attempt,
758            };
759            maybe_fail(FaultPoint::AfterComponentCall, fault_ctx)
760                .map_err(|err| anyhow!(err.to_string()))?;
761        }
762
763        if let Some((code, message)) = component_error(&value) {
764            bail!(
765                "component {} failed: {}: {}",
766                call.component_ref,
767                code,
768                message
769            );
770        }
771        Ok(NodeOutput::new(value))
772    }
773
774    async fn execute_provider_invoke(
775        &self,
776        ctx: &FlowContext<'_>,
777        node_id: &str,
778        state: &ExecutionState,
779        payload: Value,
780        event: &NodeEvent<'_>,
781    ) -> Result<NodeOutput> {
782        #[derive(Deserialize)]
783        struct ProviderPayload {
784            #[serde(default)]
785            provider_id: Option<String>,
786            #[serde(default)]
787            provider_type: Option<String>,
788            #[serde(default, alias = "operation")]
789            op: Option<String>,
790            #[serde(default)]
791            input: Value,
792            #[serde(default)]
793            in_map: Value,
794            #[serde(default)]
795            out_map: Value,
796            #[serde(default)]
797            err_map: Value,
798        }
799
800        let payload: ProviderPayload =
801            serde_json::from_value(payload).context("invalid payload for provider.invoke")?;
802        let op = payload
803            .op
804            .as_deref()
805            .filter(|v| !v.trim().is_empty())
806            .with_context(|| "provider.invoke requires an op")?
807            .to_string();
808
809        let prev = state
810            .last_output
811            .as_ref()
812            .cloned()
813            .unwrap_or_else(|| Value::Object(JsonMap::new()));
814        let base_ctx = template_context(state, prev);
815
816        let input_value = if !payload.in_map.is_null() {
817            let mut ctx_value = base_ctx.clone();
818            if let Value::Object(ref mut map) = ctx_value {
819                map.insert("input".into(), payload.input.clone());
820                map.insert("result".into(), payload.input.clone());
821            }
822            render_template_value(
823                &payload.in_map,
824                &ctx_value,
825                TemplateOptions {
826                    allow_pointer: true,
827                },
828            )
829            .context("failed to render provider.invoke in_map")?
830        } else if !payload.input.is_null() {
831            payload.input
832        } else {
833            Value::Null
834        };
835        let input_json = serde_json::to_vec(&input_value)?;
836
837        self.validate_tool(
838            ctx,
839            event,
840            payload.provider_id.as_deref(),
841            payload.provider_type.as_deref(),
842            &op,
843            &input_value,
844        )?;
845
846        let key = FlowKey {
847            pack_id: ctx.pack_id.to_string(),
848            flow_id: ctx.flow_id.to_string(),
849        };
850        let pack_idx = *self.flow_sources.get(&key).with_context(|| {
851            format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
852        })?;
853        let pack = Arc::clone(&self.packs[pack_idx]);
854        let binding = pack.resolve_provider(
855            payload.provider_id.as_deref(),
856            payload.provider_type.as_deref(),
857        )?;
858        let exec_ctx = component_exec_ctx(ctx, node_id);
859        #[cfg(feature = "fault-injection")]
860        {
861            let fault_ctx = FaultContext {
862                pack_id: ctx.pack_id,
863                flow_id: ctx.flow_id,
864                node_id: Some(node_id),
865                attempt: ctx.attempt,
866            };
867            maybe_fail(FaultPoint::BeforeToolCall, fault_ctx)
868                .map_err(|err| anyhow!(err.to_string()))?;
869        }
870        let result = pack
871            .invoke_provider(&binding, exec_ctx, &op, input_json)
872            .await?;
873        #[cfg(feature = "fault-injection")]
874        {
875            let fault_ctx = FaultContext {
876                pack_id: ctx.pack_id,
877                flow_id: ctx.flow_id,
878                node_id: Some(node_id),
879                attempt: ctx.attempt,
880            };
881            maybe_fail(FaultPoint::AfterToolCall, fault_ctx)
882                .map_err(|err| anyhow!(err.to_string()))?;
883        }
884
885        let output = if payload.out_map.is_null() {
886            result
887        } else {
888            let mut ctx_value = base_ctx;
889            if let Value::Object(ref mut map) = ctx_value {
890                map.insert("input".into(), result.clone());
891                map.insert("result".into(), result.clone());
892            }
893            render_template_value(
894                &payload.out_map,
895                &ctx_value,
896                TemplateOptions {
897                    allow_pointer: true,
898                },
899            )
900            .context("failed to render provider.invoke out_map")?
901        };
902        let _ = payload.err_map;
903        Ok(NodeOutput::new(output))
904    }
905
906    fn validate_component(
907        &self,
908        ctx: &FlowContext<'_>,
909        event: &NodeEvent<'_>,
910        call: &ComponentCall,
911    ) -> Result<()> {
912        if self.validation.mode == ValidationMode::Off {
913            return Ok(());
914        }
915        let mut metadata = JsonMap::new();
916        metadata.insert("tenant_id".to_string(), json!(ctx.tenant));
917        if let Some(id) = ctx.session_id {
918            metadata.insert("session".to_string(), json!({ "id": id }));
919        }
920        let envelope = json!({
921            "component_id": call.component_ref,
922            "operation": call.operation,
923            "input": call.input,
924            "config": call.config,
925            "metadata": Value::Object(metadata),
926        });
927        let issues = validate_component_envelope(&envelope);
928        self.report_validation(ctx, event, "component", issues)
929    }
930
931    fn validate_tool(
932        &self,
933        ctx: &FlowContext<'_>,
934        event: &NodeEvent<'_>,
935        provider_id: Option<&str>,
936        provider_type: Option<&str>,
937        operation: &str,
938        input: &Value,
939    ) -> Result<()> {
940        if self.validation.mode == ValidationMode::Off {
941            return Ok(());
942        }
943        let tool_id = provider_id.or(provider_type).unwrap_or("provider.invoke");
944        let mut metadata = JsonMap::new();
945        metadata.insert("tenant_id".to_string(), json!(ctx.tenant));
946        if let Some(id) = ctx.session_id {
947            metadata.insert("session".to_string(), json!({ "id": id }));
948        }
949        let envelope = json!({
950            "tool_id": tool_id,
951            "operation": operation,
952            "input": input,
953            "metadata": Value::Object(metadata),
954        });
955        let issues = validate_tool_envelope(&envelope);
956        self.report_validation(ctx, event, "tool", issues)
957    }
958
959    fn report_validation(
960        &self,
961        ctx: &FlowContext<'_>,
962        event: &NodeEvent<'_>,
963        kind: &str,
964        issues: Vec<ValidationIssue>,
965    ) -> Result<()> {
966        if issues.is_empty() {
967            return Ok(());
968        }
969        if let Some(observer) = ctx.observer {
970            observer.on_validation(event, &issues);
971        }
972        match self.validation.mode {
973            ValidationMode::Warn => {
974                tracing::warn!(
975                    tenant = ctx.tenant,
976                    flow_id = ctx.flow_id,
977                    node_id = event.node_id,
978                    kind,
979                    issues = ?issues,
980                    "invocation envelope validation issues"
981                );
982                Ok(())
983            }
984            ValidationMode::Error => {
985                tracing::error!(
986                    tenant = ctx.tenant,
987                    flow_id = ctx.flow_id,
988                    node_id = event.node_id,
989                    kind,
990                    issues = ?issues,
991                    "invocation envelope validation failed"
992                );
993                bail!("invocation_validation_failed");
994            }
995            ValidationMode::Off => Ok(()),
996        }
997    }
998
999    pub fn flows(&self) -> &[FlowDescriptor] {
1000        &self.flows
1001    }
1002
1003    pub fn flow_by_key(&self, pack_id: &str, flow_id: &str) -> Option<&FlowDescriptor> {
1004        self.flows
1005            .iter()
1006            .find(|descriptor| descriptor.pack_id == pack_id && descriptor.id == flow_id)
1007    }
1008
1009    pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
1010        let mut matches = self
1011            .flows
1012            .iter()
1013            .filter(|descriptor| descriptor.flow_type == flow_type);
1014        let first = matches.next()?;
1015        if matches.next().is_some() {
1016            return None;
1017        }
1018        Some(first)
1019    }
1020
1021    pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
1022        let mut matches = self
1023            .flows
1024            .iter()
1025            .filter(|descriptor| descriptor.id == flow_id);
1026        let first = matches.next()?;
1027        if matches.next().is_some() {
1028            return None;
1029        }
1030        Some(first)
1031    }
1032}
1033
1034pub trait ExecutionObserver: Send + Sync {
1035    fn on_node_start(&self, event: &NodeEvent<'_>);
1036    fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
1037    fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
1038    fn on_validation(&self, _event: &NodeEvent<'_>, _issues: &[ValidationIssue]) {}
1039}
1040
1041pub struct NodeEvent<'a> {
1042    pub context: &'a FlowContext<'a>,
1043    pub node_id: &'a str,
1044    pub node: &'a HostNode,
1045    pub payload: &'a Value,
1046}
1047
1048#[derive(Clone, Debug, Serialize, Deserialize)]
1049pub struct ExecutionState {
1050    #[serde(default)]
1051    entry: Value,
1052    #[serde(default)]
1053    input: Value,
1054    #[serde(default)]
1055    nodes: HashMap<String, NodeOutput>,
1056    #[serde(default)]
1057    egress: Vec<Value>,
1058    #[serde(default, skip_serializing_if = "Option::is_none")]
1059    last_output: Option<Value>,
1060}
1061
1062impl ExecutionState {
1063    fn new(input: Value) -> Self {
1064        Self {
1065            entry: input.clone(),
1066            input,
1067            nodes: HashMap::new(),
1068            egress: Vec::new(),
1069            last_output: None,
1070        }
1071    }
1072
1073    fn ensure_entry(&mut self) {
1074        if self.entry.is_null() {
1075            self.entry = self.input.clone();
1076        }
1077    }
1078
1079    fn context(&self) -> Value {
1080        let mut nodes = JsonMap::new();
1081        for (id, output) in &self.nodes {
1082            nodes.insert(
1083                id.clone(),
1084                json!({
1085                    "ok": output.ok,
1086                    "payload": output.payload.clone(),
1087                    "meta": output.meta.clone(),
1088                }),
1089            );
1090        }
1091        json!({
1092            "entry": self.entry.clone(),
1093            "input": self.input.clone(),
1094            "nodes": nodes,
1095        })
1096    }
1097
1098    fn outputs_map(&self) -> JsonMap<String, Value> {
1099        let mut outputs = JsonMap::new();
1100        for (id, output) in &self.nodes {
1101            outputs.insert(id.clone(), output.payload.clone());
1102        }
1103        outputs
1104    }
1105    fn push_egress(&mut self, payload: Value) {
1106        self.egress.push(payload);
1107    }
1108
1109    fn replace_input(&mut self, input: Value) {
1110        self.input = input;
1111    }
1112
1113    fn clear_egress(&mut self) {
1114        self.egress.clear();
1115    }
1116
1117    fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
1118        if self.egress.is_empty() {
1119            return final_payload.unwrap_or(Value::Null);
1120        }
1121        let mut emitted = std::mem::take(&mut self.egress);
1122        if let Some(value) = final_payload {
1123            match value {
1124                Value::Null => {}
1125                Value::Array(items) => emitted.extend(items),
1126                other => emitted.push(other),
1127            }
1128        }
1129        Value::Array(emitted)
1130    }
1131}
1132
1133#[derive(Clone, Debug, Serialize, Deserialize)]
1134struct NodeOutput {
1135    ok: bool,
1136    payload: Value,
1137    meta: Value,
1138}
1139
1140impl NodeOutput {
1141    fn new(payload: Value) -> Self {
1142        Self {
1143            ok: true,
1144            payload,
1145            meta: Value::Null,
1146        }
1147    }
1148}
1149
1150struct DispatchOutcome {
1151    output: NodeOutput,
1152    wait_reason: Option<String>,
1153}
1154
1155impl DispatchOutcome {
1156    fn complete(output: NodeOutput) -> Self {
1157        Self {
1158            output,
1159            wait_reason: None,
1160        }
1161    }
1162
1163    fn wait(output: NodeOutput, reason: Option<String>) -> Self {
1164        Self {
1165            output,
1166            wait_reason: reason,
1167        }
1168    }
1169}
1170
1171fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
1172    ComponentExecCtx {
1173        tenant: ComponentTenantCtx {
1174            tenant: ctx.tenant.to_string(),
1175            team: None,
1176            user: ctx.provider_id.map(str::to_string),
1177            trace_id: None,
1178            correlation_id: ctx.session_id.map(str::to_string),
1179            deadline_unix_ms: None,
1180            attempt: ctx.attempt,
1181            idempotency_key: ctx.session_id.map(str::to_string),
1182        },
1183        flow_id: ctx.flow_id.to_string(),
1184        node_id: Some(node_id.to_string()),
1185    }
1186}
1187
1188fn component_error(value: &Value) -> Option<(String, String)> {
1189    let obj = value.as_object()?;
1190    let ok = obj.get("ok").and_then(Value::as_bool)?;
1191    if ok {
1192        return None;
1193    }
1194    let err = obj.get("error")?.as_object()?;
1195    let code = err
1196        .get("code")
1197        .and_then(Value::as_str)
1198        .unwrap_or("component_error");
1199    let message = err
1200        .get("message")
1201        .and_then(Value::as_str)
1202        .unwrap_or("component reported error");
1203    Some((code.to_string(), message.to_string()))
1204}
1205
1206fn extract_wait_reason(payload: &Value) -> Option<String> {
1207    match payload {
1208        Value::String(s) => Some(s.clone()),
1209        Value::Object(map) => map
1210            .get("reason")
1211            .and_then(Value::as_str)
1212            .map(|value| value.to_string()),
1213        _ => None,
1214    }
1215}
1216
1217fn template_context(state: &ExecutionState, prev: Value) -> Value {
1218    let entry = if state.entry.is_null() {
1219        Value::Object(JsonMap::new())
1220    } else {
1221        state.entry.clone()
1222    };
1223    let mut ctx = JsonMap::new();
1224    ctx.insert("entry".into(), entry);
1225    ctx.insert("prev".into(), prev);
1226    ctx.insert("node".into(), Value::Object(state.outputs_map()));
1227    ctx.insert("state".into(), state.context());
1228    Value::Object(ctx)
1229}
1230
1231impl From<Flow> for HostFlow {
1232    fn from(value: Flow) -> Self {
1233        let mut nodes = IndexMap::new();
1234        for (id, node) in value.nodes {
1235            nodes.insert(id.clone(), HostNode::from(node));
1236        }
1237        let start = value
1238            .entrypoints
1239            .get("default")
1240            .and_then(Value::as_str)
1241            .and_then(|id| NodeId::from_str(id).ok())
1242            .or_else(|| nodes.keys().next().cloned());
1243        Self {
1244            id: value.id.as_str().to_string(),
1245            start,
1246            nodes,
1247        }
1248    }
1249}
1250
1251impl From<Node> for HostNode {
1252    fn from(node: Node) -> Self {
1253        let component_ref = node.component.id.as_str().to_string();
1254        let raw_operation = node.component.operation.clone();
1255        let operation_in_mapping = extract_operation_from_mapping(&node.input.mapping);
1256        let operation_is_component_exec = raw_operation.as_deref() == Some("component.exec");
1257        let operation_is_emit = raw_operation
1258            .as_deref()
1259            .map(|op| op.starts_with("emit."))
1260            .unwrap_or(false);
1261        let is_component_exec = component_ref == "component.exec" || operation_is_component_exec;
1262
1263        let kind = if is_component_exec {
1264            let target = if component_ref == "component.exec" {
1265                if let Some(op) = raw_operation
1266                    .as_deref()
1267                    .filter(|op| op.starts_with("emit."))
1268                {
1269                    op.to_string()
1270                } else {
1271                    extract_target_component(&node.input.mapping)
1272                        .unwrap_or_else(|| "component.exec".to_string())
1273                }
1274            } else {
1275                extract_target_component(&node.input.mapping)
1276                    .unwrap_or_else(|| component_ref.clone())
1277            };
1278            if target.starts_with("emit.") {
1279                NodeKind::BuiltinEmit {
1280                    kind: emit_kind_from_ref(&target),
1281                }
1282            } else {
1283                NodeKind::Exec {
1284                    target_component: target,
1285                }
1286            }
1287        } else if operation_is_emit {
1288            NodeKind::BuiltinEmit {
1289                kind: emit_kind_from_ref(raw_operation.as_deref().unwrap_or("emit.log")),
1290            }
1291        } else {
1292            match component_ref.as_str() {
1293                "flow.call" => NodeKind::FlowCall,
1294                "provider.invoke" => NodeKind::ProviderInvoke,
1295                "session.wait" => NodeKind::Wait,
1296                comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
1297                    kind: emit_kind_from_ref(comp),
1298                },
1299                other => NodeKind::PackComponent {
1300                    component_ref: other.to_string(),
1301                },
1302            }
1303        };
1304        let component_label = match &kind {
1305            NodeKind::Exec { .. } => "component.exec".to_string(),
1306            NodeKind::PackComponent { component_ref } => component_ref.clone(),
1307            NodeKind::ProviderInvoke => "provider.invoke".to_string(),
1308            NodeKind::FlowCall => "flow.call".to_string(),
1309            NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
1310            NodeKind::Wait => "session.wait".to_string(),
1311        };
1312        let operation_name = if is_component_exec && operation_is_component_exec {
1313            None
1314        } else {
1315            raw_operation.clone()
1316        };
1317        let payload_expr = match kind {
1318            NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.input.mapping),
1319            _ => node.input.mapping.clone(),
1320        };
1321        Self {
1322            kind,
1323            component: component_label,
1324            component_id: if is_component_exec {
1325                "component.exec".to_string()
1326            } else {
1327                component_ref
1328            },
1329            operation_name,
1330            operation_in_mapping,
1331            payload_expr,
1332            routing: node.routing,
1333        }
1334    }
1335}
1336
1337fn extract_target_component(payload: &Value) -> Option<String> {
1338    match payload {
1339        Value::Object(map) => map
1340            .get("component")
1341            .or_else(|| map.get("component_ref"))
1342            .and_then(Value::as_str)
1343            .map(|s| s.to_string()),
1344        _ => None,
1345    }
1346}
1347
1348fn extract_operation_from_mapping(payload: &Value) -> Option<String> {
1349    match payload {
1350        Value::Object(map) => map
1351            .get("operation")
1352            .or_else(|| map.get("op"))
1353            .and_then(Value::as_str)
1354            .map(str::trim)
1355            .filter(|value| !value.is_empty())
1356            .map(|value| value.to_string()),
1357        _ => None,
1358    }
1359}
1360
1361fn extract_emit_payload(payload: &Value) -> Value {
1362    if let Value::Object(map) = payload {
1363        if let Some(input) = map.get("input") {
1364            return input.clone();
1365        }
1366        if let Some(inner) = map.get("payload") {
1367            return inner.clone();
1368        }
1369    }
1370    payload.clone()
1371}
1372
1373fn split_operation_payload(payload: Value) -> (Value, Value) {
1374    if let Value::Object(mut map) = payload.clone()
1375        && map.contains_key("input")
1376    {
1377        let input = map.remove("input").unwrap_or(Value::Null);
1378        let config = map.remove("config").unwrap_or(Value::Null);
1379        let legacy_only = map.keys().all(|key| {
1380            matches!(
1381                key.as_str(),
1382                "operation" | "op" | "component" | "component_ref"
1383            )
1384        });
1385        if legacy_only {
1386            return (input, config);
1387        }
1388    }
1389    (payload, Value::Null)
1390}
1391
1392fn resolve_component_operation(
1393    node_id: &str,
1394    component_label: &str,
1395    payload_operation: Option<String>,
1396    operation_override: Option<&str>,
1397    operation_in_mapping: Option<&str>,
1398) -> Result<String> {
1399    if let Some(op) = operation_override
1400        .map(str::trim)
1401        .filter(|value| !value.is_empty())
1402    {
1403        return Ok(op.to_string());
1404    }
1405
1406    if let Some(op) = payload_operation
1407        .as_deref()
1408        .map(str::trim)
1409        .filter(|value| !value.is_empty())
1410    {
1411        return Ok(op.to_string());
1412    }
1413
1414    let mut message = format!(
1415        "missing operation for node `{}` (component `{}`); expected node.component.operation to be set",
1416        node_id, component_label,
1417    );
1418    if let Some(found) = operation_in_mapping {
1419        message.push_str(&format!(
1420            ". Found operation in input.mapping (`{}`) but this is not used; pack compiler must preserve node.component.operation.",
1421            found
1422        ));
1423    }
1424    bail!(message);
1425}
1426
1427fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
1428    match component_ref {
1429        "emit.log" => EmitKind::Log,
1430        "emit.response" => EmitKind::Response,
1431        other => EmitKind::Other(other.to_string()),
1432    }
1433}
1434
1435fn emit_ref_from_kind(kind: &EmitKind) -> String {
1436    match kind {
1437        EmitKind::Log => "emit.log".to_string(),
1438        EmitKind::Response => "emit.response".to_string(),
1439        EmitKind::Other(other) => other.clone(),
1440    }
1441}
1442
1443#[cfg(test)]
1444mod tests {
1445    use super::*;
1446    use crate::validate::{ValidationConfig, ValidationMode};
1447    use greentic_types::{
1448        Flow, FlowComponentRef, FlowId, FlowKind, InputMapping, Node, NodeId, OutputMapping,
1449        Routing, TelemetryHints,
1450    };
1451    use serde_json::json;
1452    use std::collections::BTreeMap;
1453    use std::str::FromStr;
1454    use std::sync::Mutex;
1455    use tokio::runtime::Runtime;
1456
1457    fn minimal_engine() -> FlowEngine {
1458        FlowEngine {
1459            packs: Vec::new(),
1460            flows: Vec::new(),
1461            flow_sources: HashMap::new(),
1462            flow_cache: RwLock::new(HashMap::new()),
1463            default_env: "local".to_string(),
1464            validation: ValidationConfig {
1465                mode: ValidationMode::Off,
1466            },
1467        }
1468    }
1469
1470    #[test]
1471    fn templating_renders_with_partials_and_data() {
1472        let mut state = ExecutionState::new(json!({ "city": "London" }));
1473        state.nodes.insert(
1474            "forecast".to_string(),
1475            NodeOutput::new(json!({ "temp": "20C" })),
1476        );
1477
1478        // templating context includes node outputs for runner-side payload rendering.
1479        let ctx = state.context();
1480        assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
1481    }
1482
1483    #[test]
1484    fn finalize_wraps_emitted_payloads() {
1485        let mut state = ExecutionState::new(json!({}));
1486        state.push_egress(json!({ "text": "first" }));
1487        state.push_egress(json!({ "text": "second" }));
1488        let result = state.finalize_with(Some(json!({ "text": "final" })));
1489        assert_eq!(
1490            result,
1491            json!([
1492                { "text": "first" },
1493                { "text": "second" },
1494                { "text": "final" }
1495            ])
1496        );
1497    }
1498
1499    #[test]
1500    fn finalize_flattens_final_array() {
1501        let mut state = ExecutionState::new(json!({}));
1502        state.push_egress(json!({ "text": "only" }));
1503        let result = state.finalize_with(Some(json!([
1504            { "text": "extra-1" },
1505            { "text": "extra-2" }
1506        ])));
1507        assert_eq!(
1508            result,
1509            json!([
1510                { "text": "only" },
1511                { "text": "extra-1" },
1512                { "text": "extra-2" }
1513            ])
1514        );
1515    }
1516
1517    #[test]
1518    fn missing_operation_reports_node_and_component() {
1519        let engine = minimal_engine();
1520        let rt = Runtime::new().unwrap();
1521        let retry_config = RetryConfig {
1522            max_attempts: 1,
1523            base_delay_ms: 1,
1524        };
1525        let ctx = FlowContext {
1526            tenant: "tenant",
1527            pack_id: "test-pack",
1528            flow_id: "flow",
1529            node_id: Some("missing-op"),
1530            tool: None,
1531            action: None,
1532            session_id: None,
1533            provider_id: None,
1534            retry_config,
1535            attempt: 1,
1536            observer: None,
1537            mocks: None,
1538        };
1539        let node = HostNode {
1540            kind: NodeKind::Exec {
1541                target_component: "qa.process".into(),
1542            },
1543            component: "component.exec".into(),
1544            component_id: "component.exec".into(),
1545            operation_name: None,
1546            operation_in_mapping: None,
1547            payload_expr: Value::Null,
1548            routing: Routing::End,
1549        };
1550        let _state = ExecutionState::new(Value::Null);
1551        let payload = json!({ "component": "qa.process" });
1552        let event = NodeEvent {
1553            context: &ctx,
1554            node_id: "missing-op",
1555            node: &node,
1556            payload: &payload,
1557        };
1558        let err = rt
1559            .block_on(engine.execute_component_exec(
1560                &ctx,
1561                "missing-op",
1562                &node,
1563                payload.clone(),
1564                &event,
1565                ComponentOverrides {
1566                    component: None,
1567                    operation: None,
1568                },
1569            ))
1570            .unwrap_err();
1571        let message = err.to_string();
1572        assert!(
1573            message.contains("missing operation for node `missing-op`"),
1574            "unexpected message: {message}"
1575        );
1576        assert!(
1577            message.contains("(component `component.exec`)"),
1578            "unexpected message: {message}"
1579        );
1580    }
1581
1582    #[test]
1583    fn missing_operation_mentions_mapping_hint() {
1584        let engine = minimal_engine();
1585        let rt = Runtime::new().unwrap();
1586        let retry_config = RetryConfig {
1587            max_attempts: 1,
1588            base_delay_ms: 1,
1589        };
1590        let ctx = FlowContext {
1591            tenant: "tenant",
1592            pack_id: "test-pack",
1593            flow_id: "flow",
1594            node_id: Some("missing-op-hint"),
1595            tool: None,
1596            action: None,
1597            session_id: None,
1598            provider_id: None,
1599            retry_config,
1600            attempt: 1,
1601            observer: None,
1602            mocks: None,
1603        };
1604        let node = HostNode {
1605            kind: NodeKind::Exec {
1606                target_component: "qa.process".into(),
1607            },
1608            component: "component.exec".into(),
1609            component_id: "component.exec".into(),
1610            operation_name: None,
1611            operation_in_mapping: Some("render".into()),
1612            payload_expr: Value::Null,
1613            routing: Routing::End,
1614        };
1615        let _state = ExecutionState::new(Value::Null);
1616        let payload = json!({ "component": "qa.process" });
1617        let event = NodeEvent {
1618            context: &ctx,
1619            node_id: "missing-op-hint",
1620            node: &node,
1621            payload: &payload,
1622        };
1623        let err = rt
1624            .block_on(engine.execute_component_exec(
1625                &ctx,
1626                "missing-op-hint",
1627                &node,
1628                payload.clone(),
1629                &event,
1630                ComponentOverrides {
1631                    component: None,
1632                    operation: None,
1633                },
1634            ))
1635            .unwrap_err();
1636        let message = err.to_string();
1637        assert!(
1638            message.contains("missing operation for node `missing-op-hint`"),
1639            "unexpected message: {message}"
1640        );
1641        assert!(
1642            message.contains("Found operation in input.mapping (`render`)"),
1643            "unexpected message: {message}"
1644        );
1645    }
1646
1647    struct CountingObserver {
1648        starts: Mutex<Vec<String>>,
1649        ends: Mutex<Vec<Value>>,
1650    }
1651
1652    impl CountingObserver {
1653        fn new() -> Self {
1654            Self {
1655                starts: Mutex::new(Vec::new()),
1656                ends: Mutex::new(Vec::new()),
1657            }
1658        }
1659    }
1660
1661    impl ExecutionObserver for CountingObserver {
1662        fn on_node_start(&self, event: &NodeEvent<'_>) {
1663            self.starts.lock().unwrap().push(event.node_id.to_string());
1664        }
1665
1666        fn on_node_end(&self, _event: &NodeEvent<'_>, output: &Value) {
1667            self.ends.lock().unwrap().push(output.clone());
1668        }
1669
1670        fn on_node_error(&self, _event: &NodeEvent<'_>, _error: &dyn StdError) {}
1671    }
1672
1673    #[test]
1674    fn emits_end_event_for_successful_node() {
1675        let node_id = NodeId::from_str("emit").unwrap();
1676        let node = Node {
1677            id: node_id.clone(),
1678            component: FlowComponentRef {
1679                id: "emit.log".parse().unwrap(),
1680                pack_alias: None,
1681                operation: None,
1682            },
1683            input: InputMapping {
1684                mapping: json!({ "message": "logged" }),
1685            },
1686            output: OutputMapping {
1687                mapping: Value::Null,
1688            },
1689            routing: Routing::End,
1690            telemetry: TelemetryHints::default(),
1691        };
1692        let mut nodes = indexmap::IndexMap::default();
1693        nodes.insert(node_id.clone(), node);
1694        let flow = Flow {
1695            schema_version: "1.0".into(),
1696            id: FlowId::from_str("emit.flow").unwrap(),
1697            kind: FlowKind::Messaging,
1698            entrypoints: BTreeMap::from([(
1699                "default".to_string(),
1700                Value::String(node_id.to_string()),
1701            )]),
1702            nodes,
1703            metadata: Default::default(),
1704        };
1705        let host_flow = HostFlow::from(flow);
1706
1707        let engine = FlowEngine {
1708            packs: Vec::new(),
1709            flows: Vec::new(),
1710            flow_sources: HashMap::new(),
1711            flow_cache: RwLock::new(HashMap::from([(
1712                FlowKey {
1713                    pack_id: "test-pack".to_string(),
1714                    flow_id: "emit.flow".to_string(),
1715                },
1716                host_flow,
1717            )])),
1718            default_env: "local".to_string(),
1719            validation: ValidationConfig {
1720                mode: ValidationMode::Off,
1721            },
1722        };
1723        let observer = CountingObserver::new();
1724        let ctx = FlowContext {
1725            tenant: "demo",
1726            pack_id: "test-pack",
1727            flow_id: "emit.flow",
1728            node_id: None,
1729            tool: None,
1730            action: None,
1731            session_id: None,
1732            provider_id: None,
1733            retry_config: RetryConfig {
1734                max_attempts: 1,
1735                base_delay_ms: 1,
1736            },
1737            attempt: 1,
1738            observer: Some(&observer),
1739            mocks: None,
1740        };
1741
1742        let rt = Runtime::new().unwrap();
1743        let result = rt.block_on(engine.execute(ctx, Value::Null)).unwrap();
1744        assert!(matches!(result.status, FlowStatus::Completed));
1745
1746        let starts = observer.starts.lock().unwrap();
1747        let ends = observer.ends.lock().unwrap();
1748        assert_eq!(starts.len(), 1);
1749        assert_eq!(ends.len(), 1);
1750        assert_eq!(ends[0], json!({ "message": "logged" }));
1751    }
1752}
1753
1754use tracing::Instrument;
1755
1756pub struct FlowContext<'a> {
1757    pub tenant: &'a str,
1758    pub pack_id: &'a str,
1759    pub flow_id: &'a str,
1760    pub node_id: Option<&'a str>,
1761    pub tool: Option<&'a str>,
1762    pub action: Option<&'a str>,
1763    pub session_id: Option<&'a str>,
1764    pub provider_id: Option<&'a str>,
1765    pub retry_config: RetryConfig,
1766    pub attempt: u32,
1767    pub observer: Option<&'a dyn ExecutionObserver>,
1768    pub mocks: Option<&'a MockLayer>,
1769}
1770
1771#[derive(Copy, Clone)]
1772pub struct RetryConfig {
1773    pub max_attempts: u32,
1774    pub base_delay_ms: u64,
1775}
1776
1777fn should_retry(err: &anyhow::Error) -> bool {
1778    let lower = err.to_string().to_lowercase();
1779    lower.contains("transient")
1780        || lower.contains("unavailable")
1781        || lower.contains("internal")
1782        || lower.contains("timeout")
1783}
1784
1785impl From<FlowRetryConfig> for RetryConfig {
1786    fn from(value: FlowRetryConfig) -> Self {
1787        Self {
1788            max_attempts: value.max_attempts.max(1),
1789            base_delay_ms: value.base_delay_ms.max(50),
1790        }
1791    }
1792}