Skip to main content

greentic_runner_host/runner/
engine.rs

1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
9use anyhow::{Context, Result, anyhow, bail};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use super::templating::{TemplateOptions, render_template_value};
18use crate::config::{FlowRetryConfig, HostConfig};
19use crate::pack::{FlowDescriptor, PackRuntime};
20use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
21#[cfg(feature = "fault-injection")]
22use crate::testing::fault_injection::{FaultContext, FaultPoint, maybe_fail};
23use crate::validate::{
24    ValidationConfig, ValidationIssue, ValidationMode, validate_component_envelope,
25    validate_tool_envelope,
26};
27use greentic_types::{Flow, Node, NodeId, Routing};
28
29pub struct FlowEngine {
30    packs: Vec<Arc<PackRuntime>>,
31    flows: Vec<FlowDescriptor>,
32    flow_sources: HashMap<FlowKey, usize>,
33    flow_cache: RwLock<HashMap<FlowKey, HostFlow>>,
34    default_env: String,
35    validation: ValidationConfig,
36}
37
38#[derive(Clone, Debug, PartialEq, Eq, Hash)]
39struct FlowKey {
40    pack_id: String,
41    flow_id: String,
42}
43
44#[derive(Clone, Debug, Serialize, Deserialize)]
45pub struct FlowSnapshot {
46    pub pack_id: String,
47    pub flow_id: String,
48    pub next_node: String,
49    pub state: ExecutionState,
50}
51
52#[derive(Clone, Debug)]
53pub struct FlowWait {
54    pub reason: Option<String>,
55    pub snapshot: FlowSnapshot,
56}
57
58#[derive(Clone, Debug)]
59pub enum FlowStatus {
60    Completed,
61    Waiting(Box<FlowWait>),
62}
63
64#[derive(Clone, Debug)]
65pub struct FlowExecution {
66    pub output: Value,
67    pub status: FlowStatus,
68}
69
70#[derive(Clone, Debug)]
71struct HostFlow {
72    id: String,
73    start: Option<NodeId>,
74    nodes: IndexMap<NodeId, HostNode>,
75}
76
77#[derive(Clone, Debug)]
78pub struct HostNode {
79    kind: NodeKind,
80    /// Backwards-compatible component label for observers/transcript.
81    pub component: String,
82    component_id: String,
83    operation_name: Option<String>,
84    operation_in_mapping: Option<String>,
85    payload_expr: Value,
86    routing: Routing,
87}
88
89impl HostNode {
90    pub fn component_id(&self) -> &str {
91        &self.component_id
92    }
93
94    pub fn operation_name(&self) -> Option<&str> {
95        self.operation_name.as_deref()
96    }
97
98    pub fn operation_in_mapping(&self) -> Option<&str> {
99        self.operation_in_mapping.as_deref()
100    }
101}
102
103#[derive(Clone, Debug)]
104enum NodeKind {
105    Exec { target_component: String },
106    PackComponent { component_ref: String },
107    ProviderInvoke,
108    FlowCall,
109    BuiltinEmit { kind: EmitKind },
110    Wait,
111}
112
113#[derive(Clone, Debug)]
114enum EmitKind {
115    Log,
116    Response,
117    Other(String),
118}
119
120struct ComponentOverrides<'a> {
121    component: Option<&'a str>,
122    operation: Option<&'a str>,
123}
124
125struct ComponentCall {
126    component_ref: String,
127    operation: String,
128    input: Value,
129    config: Value,
130}
131
132impl FlowExecution {
133    fn completed(output: Value) -> Self {
134        Self {
135            output,
136            status: FlowStatus::Completed,
137        }
138    }
139
140    fn waiting(output: Value, wait: FlowWait) -> Self {
141        Self {
142            output,
143            status: FlowStatus::Waiting(Box::new(wait)),
144        }
145    }
146}
147
148impl FlowEngine {
149    pub async fn new(packs: Vec<Arc<PackRuntime>>, config: Arc<HostConfig>) -> Result<Self> {
150        let mut flow_sources: HashMap<FlowKey, usize> = HashMap::new();
151        let mut descriptors = Vec::new();
152        let mut bindings = HashMap::new();
153        for pack in &config.pack_bindings {
154            bindings.insert(pack.pack_id.clone(), pack.flows.clone());
155        }
156        let enforce_bindings = !bindings.is_empty();
157        for (idx, pack) in packs.iter().enumerate() {
158            let pack_id = pack.metadata().pack_id.clone();
159            if enforce_bindings && !bindings.contains_key(&pack_id) {
160                bail!("no gtbind entries found for pack {}", pack_id);
161            }
162            let flows = pack.list_flows().await?;
163            let allowed = bindings.get(&pack_id).map(|flows| {
164                flows
165                    .iter()
166                    .cloned()
167                    .collect::<std::collections::HashSet<_>>()
168            });
169            let mut seen = std::collections::HashSet::new();
170            for flow in flows {
171                if let Some(ref allow) = allowed
172                    && !allow.contains(&flow.id)
173                {
174                    continue;
175                }
176                seen.insert(flow.id.clone());
177                tracing::info!(
178                    flow_id = %flow.id,
179                    flow_type = %flow.flow_type,
180                    pack_id = %flow.pack_id,
181                    pack_index = idx,
182                    "registered flow"
183                );
184                flow_sources.insert(
185                    FlowKey {
186                        pack_id: flow.pack_id.clone(),
187                        flow_id: flow.id.clone(),
188                    },
189                    idx,
190                );
191                descriptors.retain(|existing: &FlowDescriptor| {
192                    !(existing.id == flow.id && existing.pack_id == flow.pack_id)
193                });
194                descriptors.push(flow);
195            }
196            if let Some(allow) = allowed {
197                let missing = allow.difference(&seen).cloned().collect::<Vec<_>>();
198                if !missing.is_empty() {
199                    bail!(
200                        "gtbind flow ids missing in pack {}: {}",
201                        pack_id,
202                        missing.join(", ")
203                    );
204                }
205            }
206        }
207
208        let mut flow_map = HashMap::new();
209        for flow in &descriptors {
210            let pack_id = flow.pack_id.clone();
211            if let Some(&pack_idx) = flow_sources.get(&FlowKey {
212                pack_id: pack_id.clone(),
213                flow_id: flow.id.clone(),
214            }) {
215                let pack_clone = Arc::clone(&packs[pack_idx]);
216                let flow_id = flow.id.clone();
217                let task_flow_id = flow_id.clone();
218                match task::spawn_blocking(move || pack_clone.load_flow(&task_flow_id)).await {
219                    Ok(Ok(loaded_flow)) => {
220                        flow_map.insert(
221                            FlowKey {
222                                pack_id: pack_id.clone(),
223                                flow_id,
224                            },
225                            HostFlow::from(loaded_flow),
226                        );
227                    }
228                    Ok(Err(err)) => {
229                        tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
230                    }
231                    Err(err) => {
232                        tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
233                    }
234                }
235            }
236        }
237
238        Ok(Self {
239            packs,
240            flows: descriptors,
241            flow_sources,
242            flow_cache: RwLock::new(flow_map),
243            default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
244            validation: config.validation.clone(),
245        })
246    }
247
248    async fn get_or_load_flow(&self, pack_id: &str, flow_id: &str) -> Result<HostFlow> {
249        let key = FlowKey {
250            pack_id: pack_id.to_string(),
251            flow_id: flow_id.to_string(),
252        };
253        if let Some(flow) = self.flow_cache.read().get(&key).cloned() {
254            return Ok(flow);
255        }
256
257        let pack_idx = *self
258            .flow_sources
259            .get(&key)
260            .with_context(|| format!("flow {pack_id}:{flow_id} not registered"))?;
261        let pack = Arc::clone(&self.packs[pack_idx]);
262        let flow_id_owned = flow_id.to_string();
263        let task_flow_id = flow_id_owned.clone();
264        let flow = task::spawn_blocking(move || pack.load_flow(&task_flow_id))
265            .await
266            .context("failed to join flow metadata task")??;
267        let host_flow = HostFlow::from(flow);
268        self.flow_cache.write().insert(
269            FlowKey {
270                pack_id: pack_id.to_string(),
271                flow_id: flow_id_owned.clone(),
272            },
273            host_flow.clone(),
274        );
275        Ok(host_flow)
276    }
277
278    pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
279        let span = tracing::info_span!(
280            "flow.execute",
281            tenant = tracing::field::Empty,
282            flow_id = tracing::field::Empty,
283            node_id = tracing::field::Empty,
284            tool = tracing::field::Empty,
285            action = tracing::field::Empty
286        );
287        annotate_span(
288            &span,
289            &FlowSpanAttributes {
290                tenant: ctx.tenant,
291                flow_id: ctx.flow_id,
292                node_id: ctx.node_id,
293                tool: ctx.tool,
294                action: ctx.action,
295            },
296        );
297        set_flow_context(
298            &self.default_env,
299            ctx.tenant,
300            ctx.flow_id,
301            ctx.node_id,
302            ctx.provider_id,
303            ctx.session_id,
304        );
305        let retry_config = ctx.retry_config;
306        let original_input = input;
307        let mut ctx = ctx;
308        async move {
309            let mut attempt = 0u32;
310            loop {
311                attempt += 1;
312                ctx.attempt = attempt;
313                #[cfg(feature = "fault-injection")]
314                {
315                    let fault_ctx = FaultContext {
316                        pack_id: ctx.pack_id,
317                        flow_id: ctx.flow_id,
318                        node_id: ctx.node_id,
319                        attempt: ctx.attempt,
320                    };
321                    maybe_fail(FaultPoint::Timeout, fault_ctx)
322                        .map_err(|err| anyhow!(err.to_string()))?;
323                }
324                match self.execute_once(&ctx, original_input.clone()).await {
325                    Ok(value) => return Ok(value),
326                    Err(err) => {
327                        if attempt >= retry_config.max_attempts || !should_retry(&err) {
328                            return Err(err);
329                        }
330                        let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
331                        tracing::warn!(
332                            tenant = ctx.tenant,
333                            flow_id = ctx.flow_id,
334                            attempt,
335                            max_attempts = retry_config.max_attempts,
336                            delay_ms = delay,
337                            error = %err,
338                            "transient flow execution failure, backing off"
339                        );
340                        tokio::time::sleep(Duration::from_millis(delay)).await;
341                    }
342                }
343            }
344        }
345        .instrument(span)
346        .await
347    }
348
349    pub async fn resume(
350        &self,
351        ctx: FlowContext<'_>,
352        snapshot: FlowSnapshot,
353        input: Value,
354    ) -> Result<FlowExecution> {
355        if snapshot.pack_id != ctx.pack_id {
356            bail!(
357                "snapshot pack {} does not match requested {}",
358                snapshot.pack_id,
359                ctx.pack_id
360            );
361        }
362        if snapshot.flow_id != ctx.flow_id {
363            bail!(
364                "snapshot flow {} does not match requested {}",
365                snapshot.flow_id,
366                ctx.flow_id
367            );
368        }
369        let flow_ir = self.get_or_load_flow(ctx.pack_id, ctx.flow_id).await?;
370        let mut state = snapshot.state;
371        state.replace_input(input);
372        state.ensure_entry();
373        self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
374            .await
375    }
376
377    async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
378        let flow_ir = self.get_or_load_flow(ctx.pack_id, ctx.flow_id).await?;
379        let state = ExecutionState::new(input);
380        self.drive_flow(ctx, flow_ir, state, None).await
381    }
382
383    async fn drive_flow(
384        &self,
385        ctx: &FlowContext<'_>,
386        flow_ir: HostFlow,
387        mut state: ExecutionState,
388        resume_from: Option<String>,
389    ) -> Result<FlowExecution> {
390        let mut current = match resume_from {
391            Some(node) => NodeId::from_str(&node)
392                .with_context(|| format!("invalid resume node id `{node}`"))?,
393            None => flow_ir
394                .start
395                .clone()
396                .or_else(|| flow_ir.nodes.keys().next().cloned())
397                .with_context(|| format!("flow {} has no start node", flow_ir.id))?,
398        };
399
400        loop {
401            let node = flow_ir
402                .nodes
403                .get(&current)
404                .with_context(|| format!("node {} not found", current.as_str()))?;
405
406            let payload_template = node.payload_expr.clone();
407            let prev = state
408                .last_output
409                .as_ref()
410                .cloned()
411                .unwrap_or_else(|| Value::Object(JsonMap::new()));
412            let ctx_value = template_context(&state, prev);
413            #[cfg(feature = "fault-injection")]
414            {
415                let fault_ctx = FaultContext {
416                    pack_id: ctx.pack_id,
417                    flow_id: ctx.flow_id,
418                    node_id: Some(current.as_str()),
419                    attempt: ctx.attempt,
420                };
421                maybe_fail(FaultPoint::TemplateRender, fault_ctx)
422                    .map_err(|err| anyhow!(err.to_string()))?;
423            }
424            let payload =
425                render_template_value(&payload_template, &ctx_value, TemplateOptions::default())
426                    .context("failed to render node input template")?;
427            let observed_payload = payload.clone();
428            let node_id = current.clone();
429            let event = NodeEvent {
430                context: ctx,
431                node_id: node_id.as_str(),
432                node,
433                payload: &observed_payload,
434            };
435            if let Some(observer) = ctx.observer {
436                observer.on_node_start(&event);
437            }
438            let dispatch = self
439                .dispatch_node(ctx, node_id.as_str(), node, &mut state, payload, &event)
440                .await;
441            let DispatchOutcome {
442                output,
443                wait_reason,
444            } = match dispatch {
445                Ok(outcome) => outcome,
446                Err(err) => {
447                    if let Some(observer) = ctx.observer {
448                        observer.on_node_error(&event, err.as_ref());
449                    }
450                    return Err(err);
451                }
452            };
453
454            state.nodes.insert(node_id.clone().into(), output.clone());
455            state.last_output = Some(output.payload.clone());
456            if let Some(observer) = ctx.observer {
457                observer.on_node_end(&event, &output.payload);
458            }
459
460            let (next, should_exit) = match &node.routing {
461                Routing::Next { node_id } => (Some(node_id.clone()), false),
462                Routing::End | Routing::Reply => (None, true),
463                Routing::Branch { default, .. } => (default.clone(), default.is_none()),
464                Routing::Custom(raw) => {
465                    tracing::warn!(
466                        flow_id = %flow_ir.id,
467                        node_id = %node_id,
468                        routing = ?raw,
469                        "unsupported routing; terminating flow"
470                    );
471                    (None, true)
472                }
473            };
474
475            if let Some(wait_reason) = wait_reason {
476                let resume_target = next.clone().ok_or_else(|| {
477                    anyhow!(
478                        "session.wait node {} requires a non-empty route",
479                        current.as_str()
480                    )
481                })?;
482                let mut snapshot_state = state.clone();
483                snapshot_state.clear_egress();
484                let snapshot = FlowSnapshot {
485                    pack_id: ctx.pack_id.to_string(),
486                    flow_id: ctx.flow_id.to_string(),
487                    next_node: resume_target.as_str().to_string(),
488                    state: snapshot_state,
489                };
490                let output_value = state.clone().finalize_with(None);
491                return Ok(FlowExecution::waiting(
492                    output_value,
493                    FlowWait {
494                        reason: Some(wait_reason),
495                        snapshot,
496                    },
497                ));
498            }
499
500            if should_exit {
501                return Ok(FlowExecution::completed(
502                    state.finalize_with(Some(output.payload.clone())),
503                ));
504            }
505
506            match next {
507                Some(n) => current = n,
508                None => {
509                    return Ok(FlowExecution::completed(
510                        state.finalize_with(Some(output.payload.clone())),
511                    ));
512                }
513            }
514        }
515    }
516
517    async fn dispatch_node(
518        &self,
519        ctx: &FlowContext<'_>,
520        node_id: &str,
521        node: &HostNode,
522        state: &mut ExecutionState,
523        payload: Value,
524        event: &NodeEvent<'_>,
525    ) -> Result<DispatchOutcome> {
526        match &node.kind {
527            NodeKind::Exec { target_component } => self
528                .execute_component_exec(
529                    ctx,
530                    node_id,
531                    node,
532                    payload,
533                    event,
534                    ComponentOverrides {
535                        component: Some(target_component.as_str()),
536                        operation: node.operation_name.as_deref(),
537                    },
538                )
539                .await
540                .map(DispatchOutcome::complete),
541            NodeKind::PackComponent { component_ref } => self
542                .execute_component_call(ctx, node_id, node, payload, component_ref.as_str(), event)
543                .await
544                .map(DispatchOutcome::complete),
545            NodeKind::FlowCall => self
546                .execute_flow_call(ctx, payload)
547                .await
548                .map(DispatchOutcome::complete),
549            NodeKind::ProviderInvoke => self
550                .execute_provider_invoke(ctx, node_id, state, payload, event)
551                .await
552                .map(DispatchOutcome::complete),
553            NodeKind::BuiltinEmit { kind } => {
554                match kind {
555                    EmitKind::Log | EmitKind::Response => {}
556                    EmitKind::Other(component) => {
557                        tracing::debug!(%component, "handling emit.* as builtin");
558                    }
559                }
560                state.push_egress(payload.clone());
561                Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
562            }
563            NodeKind::Wait => {
564                let reason = extract_wait_reason(&payload);
565                Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
566            }
567        }
568    }
569
570    async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
571        #[derive(Deserialize)]
572        struct FlowCallPayload {
573            #[serde(alias = "flow")]
574            flow_id: String,
575            #[serde(default)]
576            input: Value,
577        }
578
579        let call: FlowCallPayload =
580            serde_json::from_value(payload).context("invalid payload for flow.call node")?;
581        if call.flow_id.trim().is_empty() {
582            bail!("flow.call requires a non-empty flow_id");
583        }
584
585        let sub_input = if call.input.is_null() {
586            Value::Null
587        } else {
588            call.input
589        };
590
591        let flow_id_owned = call.flow_id;
592        let action = "flow.call";
593        let sub_ctx = FlowContext {
594            tenant: ctx.tenant,
595            pack_id: ctx.pack_id,
596            flow_id: flow_id_owned.as_str(),
597            node_id: None,
598            tool: ctx.tool,
599            action: Some(action),
600            session_id: ctx.session_id,
601            provider_id: ctx.provider_id,
602            retry_config: ctx.retry_config,
603            attempt: ctx.attempt,
604            observer: ctx.observer,
605            mocks: ctx.mocks,
606        };
607
608        let execution = Box::pin(self.execute(sub_ctx, sub_input))
609            .await
610            .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
611        match execution.status {
612            FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
613            FlowStatus::Waiting(wait) => bail!(
614                "flow.call cannot pause (flow {} waiting {:?})",
615                flow_id_owned,
616                wait.reason
617            ),
618        }
619    }
620
621    async fn execute_component_exec(
622        &self,
623        ctx: &FlowContext<'_>,
624        node_id: &str,
625        node: &HostNode,
626        payload: Value,
627        event: &NodeEvent<'_>,
628        overrides: ComponentOverrides<'_>,
629    ) -> Result<NodeOutput> {
630        #[derive(Deserialize)]
631        struct ComponentPayload {
632            #[serde(default, alias = "component_ref", alias = "component")]
633            component: Option<String>,
634            #[serde(alias = "op")]
635            operation: Option<String>,
636            #[serde(default)]
637            input: Value,
638            #[serde(default)]
639            config: Value,
640        }
641
642        let payload: ComponentPayload =
643            serde_json::from_value(payload).context("invalid payload for component.exec")?;
644        let component_ref = overrides
645            .component
646            .map(str::to_string)
647            .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
648            .with_context(|| "component.exec requires a component_ref")?;
649        let operation = resolve_component_operation(
650            node_id,
651            node.component_id.as_str(),
652            payload.operation,
653            overrides.operation,
654            node.operation_in_mapping.as_deref(),
655        )?;
656        let call = ComponentCall {
657            component_ref,
658            operation,
659            input: payload.input,
660            config: payload.config,
661        };
662
663        self.invoke_component_call(ctx, node_id, call, event).await
664    }
665
666    async fn execute_component_call(
667        &self,
668        ctx: &FlowContext<'_>,
669        node_id: &str,
670        node: &HostNode,
671        payload: Value,
672        component_ref: &str,
673        event: &NodeEvent<'_>,
674    ) -> Result<NodeOutput> {
675        let payload_operation = extract_operation_from_mapping(&payload);
676        let (input, config) = split_operation_payload(payload);
677        let operation = resolve_component_operation(
678            node_id,
679            node.component_id.as_str(),
680            payload_operation,
681            node.operation_name.as_deref(),
682            node.operation_in_mapping.as_deref(),
683        )?;
684        let call = ComponentCall {
685            component_ref: component_ref.to_string(),
686            operation,
687            input,
688            config,
689        };
690        self.invoke_component_call(ctx, node_id, call, event).await
691    }
692
693    async fn invoke_component_call(
694        &self,
695        ctx: &FlowContext<'_>,
696        node_id: &str,
697        call: ComponentCall,
698        event: &NodeEvent<'_>,
699    ) -> Result<NodeOutput> {
700        self.validate_component(ctx, event, &call)?;
701        let input_json = serde_json::to_string(&call.input)?;
702        let config_json = if call.config.is_null() {
703            None
704        } else {
705            Some(serde_json::to_string(&call.config)?)
706        };
707
708        let key = FlowKey {
709            pack_id: ctx.pack_id.to_string(),
710            flow_id: ctx.flow_id.to_string(),
711        };
712        let pack_idx = *self.flow_sources.get(&key).with_context(|| {
713            format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
714        })?;
715        let pack = Arc::clone(&self.packs[pack_idx]);
716        let exec_ctx = component_exec_ctx(ctx, node_id);
717        #[cfg(feature = "fault-injection")]
718        {
719            let fault_ctx = FaultContext {
720                pack_id: ctx.pack_id,
721                flow_id: ctx.flow_id,
722                node_id: Some(node_id),
723                attempt: ctx.attempt,
724            };
725            maybe_fail(FaultPoint::BeforeComponentCall, fault_ctx)
726                .map_err(|err| anyhow!(err.to_string()))?;
727        }
728        let value = pack
729            .invoke_component(
730                call.component_ref.as_str(),
731                exec_ctx,
732                call.operation.as_str(),
733                config_json,
734                input_json,
735            )
736            .await?;
737        #[cfg(feature = "fault-injection")]
738        {
739            let fault_ctx = FaultContext {
740                pack_id: ctx.pack_id,
741                flow_id: ctx.flow_id,
742                node_id: Some(node_id),
743                attempt: ctx.attempt,
744            };
745            maybe_fail(FaultPoint::AfterComponentCall, fault_ctx)
746                .map_err(|err| anyhow!(err.to_string()))?;
747        }
748
749        if let Some((code, message)) = component_error(&value) {
750            bail!(
751                "component {} failed: {}: {}",
752                call.component_ref,
753                code,
754                message
755            );
756        }
757        Ok(NodeOutput::new(value))
758    }
759
760    async fn execute_provider_invoke(
761        &self,
762        ctx: &FlowContext<'_>,
763        node_id: &str,
764        state: &ExecutionState,
765        payload: Value,
766        event: &NodeEvent<'_>,
767    ) -> Result<NodeOutput> {
768        #[derive(Deserialize)]
769        struct ProviderPayload {
770            #[serde(default)]
771            provider_id: Option<String>,
772            #[serde(default)]
773            provider_type: Option<String>,
774            #[serde(default, alias = "operation")]
775            op: Option<String>,
776            #[serde(default)]
777            input: Value,
778            #[serde(default)]
779            in_map: Value,
780            #[serde(default)]
781            out_map: Value,
782            #[serde(default)]
783            err_map: Value,
784        }
785
786        let payload: ProviderPayload =
787            serde_json::from_value(payload).context("invalid payload for provider.invoke")?;
788        let op = payload
789            .op
790            .as_deref()
791            .filter(|v| !v.trim().is_empty())
792            .with_context(|| "provider.invoke requires an op")?
793            .to_string();
794
795        let prev = state
796            .last_output
797            .as_ref()
798            .cloned()
799            .unwrap_or_else(|| Value::Object(JsonMap::new()));
800        let base_ctx = template_context(state, prev);
801
802        let input_value = if !payload.in_map.is_null() {
803            let mut ctx_value = base_ctx.clone();
804            if let Value::Object(ref mut map) = ctx_value {
805                map.insert("input".into(), payload.input.clone());
806                map.insert("result".into(), payload.input.clone());
807            }
808            render_template_value(
809                &payload.in_map,
810                &ctx_value,
811                TemplateOptions {
812                    allow_pointer: true,
813                },
814            )
815            .context("failed to render provider.invoke in_map")?
816        } else if !payload.input.is_null() {
817            payload.input
818        } else {
819            Value::Null
820        };
821        let input_json = serde_json::to_vec(&input_value)?;
822
823        self.validate_tool(
824            ctx,
825            event,
826            payload.provider_id.as_deref(),
827            payload.provider_type.as_deref(),
828            &op,
829            &input_value,
830        )?;
831
832        let key = FlowKey {
833            pack_id: ctx.pack_id.to_string(),
834            flow_id: ctx.flow_id.to_string(),
835        };
836        let pack_idx = *self.flow_sources.get(&key).with_context(|| {
837            format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
838        })?;
839        let pack = Arc::clone(&self.packs[pack_idx]);
840        let binding = pack.resolve_provider(
841            payload.provider_id.as_deref(),
842            payload.provider_type.as_deref(),
843        )?;
844        let exec_ctx = component_exec_ctx(ctx, node_id);
845        #[cfg(feature = "fault-injection")]
846        {
847            let fault_ctx = FaultContext {
848                pack_id: ctx.pack_id,
849                flow_id: ctx.flow_id,
850                node_id: Some(node_id),
851                attempt: ctx.attempt,
852            };
853            maybe_fail(FaultPoint::BeforeToolCall, fault_ctx)
854                .map_err(|err| anyhow!(err.to_string()))?;
855        }
856        let result = pack
857            .invoke_provider(&binding, exec_ctx, &op, input_json)
858            .await?;
859        #[cfg(feature = "fault-injection")]
860        {
861            let fault_ctx = FaultContext {
862                pack_id: ctx.pack_id,
863                flow_id: ctx.flow_id,
864                node_id: Some(node_id),
865                attempt: ctx.attempt,
866            };
867            maybe_fail(FaultPoint::AfterToolCall, fault_ctx)
868                .map_err(|err| anyhow!(err.to_string()))?;
869        }
870
871        let output = if payload.out_map.is_null() {
872            result
873        } else {
874            let mut ctx_value = base_ctx;
875            if let Value::Object(ref mut map) = ctx_value {
876                map.insert("input".into(), result.clone());
877                map.insert("result".into(), result.clone());
878            }
879            render_template_value(
880                &payload.out_map,
881                &ctx_value,
882                TemplateOptions {
883                    allow_pointer: true,
884                },
885            )
886            .context("failed to render provider.invoke out_map")?
887        };
888        let _ = payload.err_map;
889        Ok(NodeOutput::new(output))
890    }
891
892    fn validate_component(
893        &self,
894        ctx: &FlowContext<'_>,
895        event: &NodeEvent<'_>,
896        call: &ComponentCall,
897    ) -> Result<()> {
898        if self.validation.mode == ValidationMode::Off {
899            return Ok(());
900        }
901        let mut metadata = JsonMap::new();
902        metadata.insert("tenant_id".to_string(), json!(ctx.tenant));
903        if let Some(id) = ctx.session_id {
904            metadata.insert("session".to_string(), json!({ "id": id }));
905        }
906        let envelope = json!({
907            "component_id": call.component_ref,
908            "operation": call.operation,
909            "input": call.input,
910            "config": call.config,
911            "metadata": Value::Object(metadata),
912        });
913        let issues = validate_component_envelope(&envelope);
914        self.report_validation(ctx, event, "component", issues)
915    }
916
917    fn validate_tool(
918        &self,
919        ctx: &FlowContext<'_>,
920        event: &NodeEvent<'_>,
921        provider_id: Option<&str>,
922        provider_type: Option<&str>,
923        operation: &str,
924        input: &Value,
925    ) -> Result<()> {
926        if self.validation.mode == ValidationMode::Off {
927            return Ok(());
928        }
929        let tool_id = provider_id.or(provider_type).unwrap_or("provider.invoke");
930        let mut metadata = JsonMap::new();
931        metadata.insert("tenant_id".to_string(), json!(ctx.tenant));
932        if let Some(id) = ctx.session_id {
933            metadata.insert("session".to_string(), json!({ "id": id }));
934        }
935        let envelope = json!({
936            "tool_id": tool_id,
937            "operation": operation,
938            "input": input,
939            "metadata": Value::Object(metadata),
940        });
941        let issues = validate_tool_envelope(&envelope);
942        self.report_validation(ctx, event, "tool", issues)
943    }
944
945    fn report_validation(
946        &self,
947        ctx: &FlowContext<'_>,
948        event: &NodeEvent<'_>,
949        kind: &str,
950        issues: Vec<ValidationIssue>,
951    ) -> Result<()> {
952        if issues.is_empty() {
953            return Ok(());
954        }
955        if let Some(observer) = ctx.observer {
956            observer.on_validation(event, &issues);
957        }
958        match self.validation.mode {
959            ValidationMode::Warn => {
960                tracing::warn!(
961                    tenant = ctx.tenant,
962                    flow_id = ctx.flow_id,
963                    node_id = event.node_id,
964                    kind,
965                    issues = ?issues,
966                    "invocation envelope validation issues"
967                );
968                Ok(())
969            }
970            ValidationMode::Error => {
971                tracing::error!(
972                    tenant = ctx.tenant,
973                    flow_id = ctx.flow_id,
974                    node_id = event.node_id,
975                    kind,
976                    issues = ?issues,
977                    "invocation envelope validation failed"
978                );
979                bail!("invocation_validation_failed");
980            }
981            ValidationMode::Off => Ok(()),
982        }
983    }
984
985    pub fn flows(&self) -> &[FlowDescriptor] {
986        &self.flows
987    }
988
989    pub fn flow_by_key(&self, pack_id: &str, flow_id: &str) -> Option<&FlowDescriptor> {
990        self.flows
991            .iter()
992            .find(|descriptor| descriptor.pack_id == pack_id && descriptor.id == flow_id)
993    }
994
995    pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
996        let mut matches = self
997            .flows
998            .iter()
999            .filter(|descriptor| descriptor.flow_type == flow_type);
1000        let first = matches.next()?;
1001        if matches.next().is_some() {
1002            return None;
1003        }
1004        Some(first)
1005    }
1006
1007    pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
1008        let mut matches = self
1009            .flows
1010            .iter()
1011            .filter(|descriptor| descriptor.id == flow_id);
1012        let first = matches.next()?;
1013        if matches.next().is_some() {
1014            return None;
1015        }
1016        Some(first)
1017    }
1018}
1019
1020pub trait ExecutionObserver: Send + Sync {
1021    fn on_node_start(&self, event: &NodeEvent<'_>);
1022    fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
1023    fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
1024    fn on_validation(&self, _event: &NodeEvent<'_>, _issues: &[ValidationIssue]) {}
1025}
1026
1027pub struct NodeEvent<'a> {
1028    pub context: &'a FlowContext<'a>,
1029    pub node_id: &'a str,
1030    pub node: &'a HostNode,
1031    pub payload: &'a Value,
1032}
1033
1034#[derive(Clone, Debug, Serialize, Deserialize)]
1035pub struct ExecutionState {
1036    #[serde(default)]
1037    entry: Value,
1038    #[serde(default)]
1039    input: Value,
1040    #[serde(default)]
1041    nodes: HashMap<String, NodeOutput>,
1042    #[serde(default)]
1043    egress: Vec<Value>,
1044    #[serde(default, skip_serializing_if = "Option::is_none")]
1045    last_output: Option<Value>,
1046}
1047
1048impl ExecutionState {
1049    fn new(input: Value) -> Self {
1050        Self {
1051            entry: input.clone(),
1052            input,
1053            nodes: HashMap::new(),
1054            egress: Vec::new(),
1055            last_output: None,
1056        }
1057    }
1058
1059    fn ensure_entry(&mut self) {
1060        if self.entry.is_null() {
1061            self.entry = self.input.clone();
1062        }
1063    }
1064
1065    fn context(&self) -> Value {
1066        let mut nodes = JsonMap::new();
1067        for (id, output) in &self.nodes {
1068            nodes.insert(
1069                id.clone(),
1070                json!({
1071                    "ok": output.ok,
1072                    "payload": output.payload.clone(),
1073                    "meta": output.meta.clone(),
1074                }),
1075            );
1076        }
1077        json!({
1078            "entry": self.entry.clone(),
1079            "input": self.input.clone(),
1080            "nodes": nodes,
1081        })
1082    }
1083
1084    fn outputs_map(&self) -> JsonMap<String, Value> {
1085        let mut outputs = JsonMap::new();
1086        for (id, output) in &self.nodes {
1087            outputs.insert(id.clone(), output.payload.clone());
1088        }
1089        outputs
1090    }
1091    fn push_egress(&mut self, payload: Value) {
1092        self.egress.push(payload);
1093    }
1094
1095    fn replace_input(&mut self, input: Value) {
1096        self.input = input;
1097    }
1098
1099    fn clear_egress(&mut self) {
1100        self.egress.clear();
1101    }
1102
1103    fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
1104        if self.egress.is_empty() {
1105            return final_payload.unwrap_or(Value::Null);
1106        }
1107        let mut emitted = std::mem::take(&mut self.egress);
1108        if let Some(value) = final_payload {
1109            match value {
1110                Value::Null => {}
1111                Value::Array(items) => emitted.extend(items),
1112                other => emitted.push(other),
1113            }
1114        }
1115        Value::Array(emitted)
1116    }
1117}
1118
1119#[derive(Clone, Debug, Serialize, Deserialize)]
1120struct NodeOutput {
1121    ok: bool,
1122    payload: Value,
1123    meta: Value,
1124}
1125
1126impl NodeOutput {
1127    fn new(payload: Value) -> Self {
1128        Self {
1129            ok: true,
1130            payload,
1131            meta: Value::Null,
1132        }
1133    }
1134}
1135
1136struct DispatchOutcome {
1137    output: NodeOutput,
1138    wait_reason: Option<String>,
1139}
1140
1141impl DispatchOutcome {
1142    fn complete(output: NodeOutput) -> Self {
1143        Self {
1144            output,
1145            wait_reason: None,
1146        }
1147    }
1148
1149    fn wait(output: NodeOutput, reason: Option<String>) -> Self {
1150        Self {
1151            output,
1152            wait_reason: reason,
1153        }
1154    }
1155}
1156
1157fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
1158    ComponentExecCtx {
1159        tenant: ComponentTenantCtx {
1160            tenant: ctx.tenant.to_string(),
1161            team: None,
1162            user: ctx.provider_id.map(str::to_string),
1163            trace_id: None,
1164            correlation_id: ctx.session_id.map(str::to_string),
1165            deadline_unix_ms: None,
1166            attempt: ctx.attempt,
1167            idempotency_key: ctx.session_id.map(str::to_string),
1168        },
1169        flow_id: ctx.flow_id.to_string(),
1170        node_id: Some(node_id.to_string()),
1171    }
1172}
1173
1174fn component_error(value: &Value) -> Option<(String, String)> {
1175    let obj = value.as_object()?;
1176    let ok = obj.get("ok").and_then(Value::as_bool)?;
1177    if ok {
1178        return None;
1179    }
1180    let err = obj.get("error")?.as_object()?;
1181    let code = err
1182        .get("code")
1183        .and_then(Value::as_str)
1184        .unwrap_or("component_error");
1185    let message = err
1186        .get("message")
1187        .and_then(Value::as_str)
1188        .unwrap_or("component reported error");
1189    Some((code.to_string(), message.to_string()))
1190}
1191
1192fn extract_wait_reason(payload: &Value) -> Option<String> {
1193    match payload {
1194        Value::String(s) => Some(s.clone()),
1195        Value::Object(map) => map
1196            .get("reason")
1197            .and_then(Value::as_str)
1198            .map(|value| value.to_string()),
1199        _ => None,
1200    }
1201}
1202
1203fn template_context(state: &ExecutionState, prev: Value) -> Value {
1204    let entry = if state.entry.is_null() {
1205        Value::Object(JsonMap::new())
1206    } else {
1207        state.entry.clone()
1208    };
1209    let mut ctx = JsonMap::new();
1210    ctx.insert("entry".into(), entry);
1211    ctx.insert("prev".into(), prev);
1212    ctx.insert("node".into(), Value::Object(state.outputs_map()));
1213    ctx.insert("state".into(), state.context());
1214    Value::Object(ctx)
1215}
1216
1217impl From<Flow> for HostFlow {
1218    fn from(value: Flow) -> Self {
1219        let mut nodes = IndexMap::new();
1220        for (id, node) in value.nodes {
1221            nodes.insert(id.clone(), HostNode::from(node));
1222        }
1223        let start = value
1224            .entrypoints
1225            .get("default")
1226            .and_then(Value::as_str)
1227            .and_then(|id| NodeId::from_str(id).ok())
1228            .or_else(|| nodes.keys().next().cloned());
1229        Self {
1230            id: value.id.as_str().to_string(),
1231            start,
1232            nodes,
1233        }
1234    }
1235}
1236
1237impl From<Node> for HostNode {
1238    fn from(node: Node) -> Self {
1239        let component_ref = node.component.id.as_str().to_string();
1240        let raw_operation = node.component.operation.clone();
1241        let operation_in_mapping = extract_operation_from_mapping(&node.input.mapping);
1242        let operation_is_component_exec = raw_operation.as_deref() == Some("component.exec");
1243        let operation_is_emit = raw_operation
1244            .as_deref()
1245            .map(|op| op.starts_with("emit."))
1246            .unwrap_or(false);
1247        let is_component_exec = component_ref == "component.exec" || operation_is_component_exec;
1248
1249        let kind = if is_component_exec {
1250            let target = if component_ref == "component.exec" {
1251                if let Some(op) = raw_operation
1252                    .as_deref()
1253                    .filter(|op| op.starts_with("emit."))
1254                {
1255                    op.to_string()
1256                } else {
1257                    extract_target_component(&node.input.mapping)
1258                        .unwrap_or_else(|| "component.exec".to_string())
1259                }
1260            } else {
1261                extract_target_component(&node.input.mapping)
1262                    .unwrap_or_else(|| component_ref.clone())
1263            };
1264            if target.starts_with("emit.") {
1265                NodeKind::BuiltinEmit {
1266                    kind: emit_kind_from_ref(&target),
1267                }
1268            } else {
1269                NodeKind::Exec {
1270                    target_component: target,
1271                }
1272            }
1273        } else if operation_is_emit {
1274            NodeKind::BuiltinEmit {
1275                kind: emit_kind_from_ref(raw_operation.as_deref().unwrap_or("emit.log")),
1276            }
1277        } else {
1278            match component_ref.as_str() {
1279                "flow.call" => NodeKind::FlowCall,
1280                "provider.invoke" => NodeKind::ProviderInvoke,
1281                "session.wait" => NodeKind::Wait,
1282                comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
1283                    kind: emit_kind_from_ref(comp),
1284                },
1285                other => NodeKind::PackComponent {
1286                    component_ref: other.to_string(),
1287                },
1288            }
1289        };
1290        let component_label = match &kind {
1291            NodeKind::Exec { .. } => "component.exec".to_string(),
1292            NodeKind::PackComponent { component_ref } => component_ref.clone(),
1293            NodeKind::ProviderInvoke => "provider.invoke".to_string(),
1294            NodeKind::FlowCall => "flow.call".to_string(),
1295            NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
1296            NodeKind::Wait => "session.wait".to_string(),
1297        };
1298        let operation_name = if is_component_exec && operation_is_component_exec {
1299            None
1300        } else {
1301            raw_operation.clone()
1302        };
1303        let payload_expr = match kind {
1304            NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.input.mapping),
1305            _ => node.input.mapping.clone(),
1306        };
1307        Self {
1308            kind,
1309            component: component_label,
1310            component_id: if is_component_exec {
1311                "component.exec".to_string()
1312            } else {
1313                component_ref
1314            },
1315            operation_name,
1316            operation_in_mapping,
1317            payload_expr,
1318            routing: node.routing,
1319        }
1320    }
1321}
1322
1323fn extract_target_component(payload: &Value) -> Option<String> {
1324    match payload {
1325        Value::Object(map) => map
1326            .get("component")
1327            .or_else(|| map.get("component_ref"))
1328            .and_then(Value::as_str)
1329            .map(|s| s.to_string()),
1330        _ => None,
1331    }
1332}
1333
1334fn extract_operation_from_mapping(payload: &Value) -> Option<String> {
1335    match payload {
1336        Value::Object(map) => map
1337            .get("operation")
1338            .or_else(|| map.get("op"))
1339            .and_then(Value::as_str)
1340            .map(str::trim)
1341            .filter(|value| !value.is_empty())
1342            .map(|value| value.to_string()),
1343        _ => None,
1344    }
1345}
1346
1347fn extract_emit_payload(payload: &Value) -> Value {
1348    if let Value::Object(map) = payload {
1349        if let Some(input) = map.get("input") {
1350            return input.clone();
1351        }
1352        if let Some(inner) = map.get("payload") {
1353            return inner.clone();
1354        }
1355    }
1356    payload.clone()
1357}
1358
1359fn split_operation_payload(payload: Value) -> (Value, Value) {
1360    if let Value::Object(mut map) = payload.clone()
1361        && map.contains_key("input")
1362    {
1363        let input = map.remove("input").unwrap_or(Value::Null);
1364        let config = map.remove("config").unwrap_or(Value::Null);
1365        let legacy_only = map.keys().all(|key| {
1366            matches!(
1367                key.as_str(),
1368                "operation" | "op" | "component" | "component_ref"
1369            )
1370        });
1371        if legacy_only {
1372            return (input, config);
1373        }
1374    }
1375    (payload, Value::Null)
1376}
1377
1378fn resolve_component_operation(
1379    node_id: &str,
1380    component_label: &str,
1381    payload_operation: Option<String>,
1382    operation_override: Option<&str>,
1383    operation_in_mapping: Option<&str>,
1384) -> Result<String> {
1385    if let Some(op) = operation_override
1386        .map(str::trim)
1387        .filter(|value| !value.is_empty())
1388    {
1389        return Ok(op.to_string());
1390    }
1391
1392    if let Some(op) = payload_operation
1393        .as_deref()
1394        .map(str::trim)
1395        .filter(|value| !value.is_empty())
1396    {
1397        return Ok(op.to_string());
1398    }
1399
1400    let mut message = format!(
1401        "missing operation for node `{}` (component `{}`); expected node.component.operation to be set",
1402        node_id, component_label,
1403    );
1404    if let Some(found) = operation_in_mapping {
1405        message.push_str(&format!(
1406            ". Found operation in input.mapping (`{}`) but this is not used; pack compiler must preserve node.component.operation.",
1407            found
1408        ));
1409    }
1410    bail!(message);
1411}
1412
1413fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
1414    match component_ref {
1415        "emit.log" => EmitKind::Log,
1416        "emit.response" => EmitKind::Response,
1417        other => EmitKind::Other(other.to_string()),
1418    }
1419}
1420
1421fn emit_ref_from_kind(kind: &EmitKind) -> String {
1422    match kind {
1423        EmitKind::Log => "emit.log".to_string(),
1424        EmitKind::Response => "emit.response".to_string(),
1425        EmitKind::Other(other) => other.clone(),
1426    }
1427}
1428
1429#[cfg(test)]
1430mod tests {
1431    use super::*;
1432    use crate::validate::{ValidationConfig, ValidationMode};
1433    use greentic_types::{
1434        Flow, FlowComponentRef, FlowId, FlowKind, InputMapping, Node, NodeId, OutputMapping,
1435        Routing, TelemetryHints,
1436    };
1437    use serde_json::json;
1438    use std::collections::BTreeMap;
1439    use std::str::FromStr;
1440    use std::sync::Mutex;
1441    use tokio::runtime::Runtime;
1442
1443    fn minimal_engine() -> FlowEngine {
1444        FlowEngine {
1445            packs: Vec::new(),
1446            flows: Vec::new(),
1447            flow_sources: HashMap::new(),
1448            flow_cache: RwLock::new(HashMap::new()),
1449            default_env: "local".to_string(),
1450            validation: ValidationConfig {
1451                mode: ValidationMode::Off,
1452            },
1453        }
1454    }
1455
1456    #[test]
1457    fn templating_renders_with_partials_and_data() {
1458        let mut state = ExecutionState::new(json!({ "city": "London" }));
1459        state.nodes.insert(
1460            "forecast".to_string(),
1461            NodeOutput::new(json!({ "temp": "20C" })),
1462        );
1463
1464        // templating context includes node outputs for runner-side payload rendering.
1465        let ctx = state.context();
1466        assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
1467    }
1468
1469    #[test]
1470    fn finalize_wraps_emitted_payloads() {
1471        let mut state = ExecutionState::new(json!({}));
1472        state.push_egress(json!({ "text": "first" }));
1473        state.push_egress(json!({ "text": "second" }));
1474        let result = state.finalize_with(Some(json!({ "text": "final" })));
1475        assert_eq!(
1476            result,
1477            json!([
1478                { "text": "first" },
1479                { "text": "second" },
1480                { "text": "final" }
1481            ])
1482        );
1483    }
1484
1485    #[test]
1486    fn finalize_flattens_final_array() {
1487        let mut state = ExecutionState::new(json!({}));
1488        state.push_egress(json!({ "text": "only" }));
1489        let result = state.finalize_with(Some(json!([
1490            { "text": "extra-1" },
1491            { "text": "extra-2" }
1492        ])));
1493        assert_eq!(
1494            result,
1495            json!([
1496                { "text": "only" },
1497                { "text": "extra-1" },
1498                { "text": "extra-2" }
1499            ])
1500        );
1501    }
1502
1503    #[test]
1504    fn missing_operation_reports_node_and_component() {
1505        let engine = minimal_engine();
1506        let rt = Runtime::new().unwrap();
1507        let retry_config = RetryConfig {
1508            max_attempts: 1,
1509            base_delay_ms: 1,
1510        };
1511        let ctx = FlowContext {
1512            tenant: "tenant",
1513            pack_id: "test-pack",
1514            flow_id: "flow",
1515            node_id: Some("missing-op"),
1516            tool: None,
1517            action: None,
1518            session_id: None,
1519            provider_id: None,
1520            retry_config,
1521            attempt: 1,
1522            observer: None,
1523            mocks: None,
1524        };
1525        let node = HostNode {
1526            kind: NodeKind::Exec {
1527                target_component: "qa.process".into(),
1528            },
1529            component: "component.exec".into(),
1530            component_id: "component.exec".into(),
1531            operation_name: None,
1532            operation_in_mapping: None,
1533            payload_expr: Value::Null,
1534            routing: Routing::End,
1535        };
1536        let _state = ExecutionState::new(Value::Null);
1537        let payload = json!({ "component": "qa.process" });
1538        let event = NodeEvent {
1539            context: &ctx,
1540            node_id: "missing-op",
1541            node: &node,
1542            payload: &payload,
1543        };
1544        let err = rt
1545            .block_on(engine.execute_component_exec(
1546                &ctx,
1547                "missing-op",
1548                &node,
1549                payload.clone(),
1550                &event,
1551                ComponentOverrides {
1552                    component: None,
1553                    operation: None,
1554                },
1555            ))
1556            .unwrap_err();
1557        let message = err.to_string();
1558        assert!(
1559            message.contains("missing operation for node `missing-op`"),
1560            "unexpected message: {message}"
1561        );
1562        assert!(
1563            message.contains("(component `component.exec`)"),
1564            "unexpected message: {message}"
1565        );
1566    }
1567
1568    #[test]
1569    fn missing_operation_mentions_mapping_hint() {
1570        let engine = minimal_engine();
1571        let rt = Runtime::new().unwrap();
1572        let retry_config = RetryConfig {
1573            max_attempts: 1,
1574            base_delay_ms: 1,
1575        };
1576        let ctx = FlowContext {
1577            tenant: "tenant",
1578            pack_id: "test-pack",
1579            flow_id: "flow",
1580            node_id: Some("missing-op-hint"),
1581            tool: None,
1582            action: None,
1583            session_id: None,
1584            provider_id: None,
1585            retry_config,
1586            attempt: 1,
1587            observer: None,
1588            mocks: None,
1589        };
1590        let node = HostNode {
1591            kind: NodeKind::Exec {
1592                target_component: "qa.process".into(),
1593            },
1594            component: "component.exec".into(),
1595            component_id: "component.exec".into(),
1596            operation_name: None,
1597            operation_in_mapping: Some("render".into()),
1598            payload_expr: Value::Null,
1599            routing: Routing::End,
1600        };
1601        let _state = ExecutionState::new(Value::Null);
1602        let payload = json!({ "component": "qa.process" });
1603        let event = NodeEvent {
1604            context: &ctx,
1605            node_id: "missing-op-hint",
1606            node: &node,
1607            payload: &payload,
1608        };
1609        let err = rt
1610            .block_on(engine.execute_component_exec(
1611                &ctx,
1612                "missing-op-hint",
1613                &node,
1614                payload.clone(),
1615                &event,
1616                ComponentOverrides {
1617                    component: None,
1618                    operation: None,
1619                },
1620            ))
1621            .unwrap_err();
1622        let message = err.to_string();
1623        assert!(
1624            message.contains("missing operation for node `missing-op-hint`"),
1625            "unexpected message: {message}"
1626        );
1627        assert!(
1628            message.contains("Found operation in input.mapping (`render`)"),
1629            "unexpected message: {message}"
1630        );
1631    }
1632
1633    struct CountingObserver {
1634        starts: Mutex<Vec<String>>,
1635        ends: Mutex<Vec<Value>>,
1636    }
1637
1638    impl CountingObserver {
1639        fn new() -> Self {
1640            Self {
1641                starts: Mutex::new(Vec::new()),
1642                ends: Mutex::new(Vec::new()),
1643            }
1644        }
1645    }
1646
1647    impl ExecutionObserver for CountingObserver {
1648        fn on_node_start(&self, event: &NodeEvent<'_>) {
1649            self.starts.lock().unwrap().push(event.node_id.to_string());
1650        }
1651
1652        fn on_node_end(&self, _event: &NodeEvent<'_>, output: &Value) {
1653            self.ends.lock().unwrap().push(output.clone());
1654        }
1655
1656        fn on_node_error(&self, _event: &NodeEvent<'_>, _error: &dyn StdError) {}
1657    }
1658
1659    #[test]
1660    fn emits_end_event_for_successful_node() {
1661        let node_id = NodeId::from_str("emit").unwrap();
1662        let node = Node {
1663            id: node_id.clone(),
1664            component: FlowComponentRef {
1665                id: "emit.log".parse().unwrap(),
1666                pack_alias: None,
1667                operation: None,
1668            },
1669            input: InputMapping {
1670                mapping: json!({ "message": "logged" }),
1671            },
1672            output: OutputMapping {
1673                mapping: Value::Null,
1674            },
1675            routing: Routing::End,
1676            telemetry: TelemetryHints::default(),
1677        };
1678        let mut nodes = indexmap::IndexMap::default();
1679        nodes.insert(node_id.clone(), node);
1680        let flow = Flow {
1681            schema_version: "1.0".into(),
1682            id: FlowId::from_str("emit.flow").unwrap(),
1683            kind: FlowKind::Messaging,
1684            entrypoints: BTreeMap::from([(
1685                "default".to_string(),
1686                Value::String(node_id.to_string()),
1687            )]),
1688            nodes,
1689            metadata: Default::default(),
1690        };
1691        let host_flow = HostFlow::from(flow);
1692
1693        let engine = FlowEngine {
1694            packs: Vec::new(),
1695            flows: Vec::new(),
1696            flow_sources: HashMap::new(),
1697            flow_cache: RwLock::new(HashMap::from([(
1698                FlowKey {
1699                    pack_id: "test-pack".to_string(),
1700                    flow_id: "emit.flow".to_string(),
1701                },
1702                host_flow,
1703            )])),
1704            default_env: "local".to_string(),
1705            validation: ValidationConfig {
1706                mode: ValidationMode::Off,
1707            },
1708        };
1709        let observer = CountingObserver::new();
1710        let ctx = FlowContext {
1711            tenant: "demo",
1712            pack_id: "test-pack",
1713            flow_id: "emit.flow",
1714            node_id: None,
1715            tool: None,
1716            action: None,
1717            session_id: None,
1718            provider_id: None,
1719            retry_config: RetryConfig {
1720                max_attempts: 1,
1721                base_delay_ms: 1,
1722            },
1723            attempt: 1,
1724            observer: Some(&observer),
1725            mocks: None,
1726        };
1727
1728        let rt = Runtime::new().unwrap();
1729        let result = rt.block_on(engine.execute(ctx, Value::Null)).unwrap();
1730        assert!(matches!(result.status, FlowStatus::Completed));
1731
1732        let starts = observer.starts.lock().unwrap();
1733        let ends = observer.ends.lock().unwrap();
1734        assert_eq!(starts.len(), 1);
1735        assert_eq!(ends.len(), 1);
1736        assert_eq!(ends[0], json!({ "message": "logged" }));
1737    }
1738}
1739
1740use tracing::Instrument;
1741
1742pub struct FlowContext<'a> {
1743    pub tenant: &'a str,
1744    pub pack_id: &'a str,
1745    pub flow_id: &'a str,
1746    pub node_id: Option<&'a str>,
1747    pub tool: Option<&'a str>,
1748    pub action: Option<&'a str>,
1749    pub session_id: Option<&'a str>,
1750    pub provider_id: Option<&'a str>,
1751    pub retry_config: RetryConfig,
1752    pub attempt: u32,
1753    pub observer: Option<&'a dyn ExecutionObserver>,
1754    pub mocks: Option<&'a MockLayer>,
1755}
1756
1757#[derive(Copy, Clone)]
1758pub struct RetryConfig {
1759    pub max_attempts: u32,
1760    pub base_delay_ms: u64,
1761}
1762
1763fn should_retry(err: &anyhow::Error) -> bool {
1764    let lower = err.to_string().to_lowercase();
1765    lower.contains("transient")
1766        || lower.contains("unavailable")
1767        || lower.contains("internal")
1768        || lower.contains("timeout")
1769}
1770
1771impl From<FlowRetryConfig> for RetryConfig {
1772    fn from(value: FlowRetryConfig) -> Self {
1773        Self {
1774            max_attempts: value.max_attempts.max(1),
1775            base_delay_ms: value.base_delay_ms.max(50),
1776        }
1777    }
1778}