Skip to main content

greentic_runner_host/runner/
engine.rs

1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
9use anyhow::{Context, Result, anyhow, bail};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use super::templating::{TemplateOptions, render_template_value};
18use crate::config::{FlowRetryConfig, HostConfig};
19use crate::pack::{FlowDescriptor, PackRuntime};
20use crate::runner::invocation::{InvocationMeta, build_invocation_envelope};
21use crate::telemetry::{
22    FlowSpanAttributes, RolloutIds, annotate_span, backoff_delay_ms, set_flow_context,
23};
24#[cfg(feature = "fault-injection")]
25use crate::testing::fault_injection::{FaultContext, FaultPoint, maybe_fail};
26use crate::validate::{
27    ValidationConfig, ValidationIssue, ValidationMode, validate_component_envelope,
28    validate_tool_envelope,
29};
30use greentic_flow::SLOT_SCHEMA_METADATA_KEY;
31use greentic_types::{Flow, Node, NodeId, Routing};
32
33/// Component ID of the slot-extractor WASM component. Used to detect
34/// slot-extractor nodes and inject flow-level `slot_schema` as
35/// `slot_definitions` into the invocation payload (Phase D).
36const SLOT_EXTRACTOR_COMPONENT_ID: &str = "ai.greentic.component-slot-extractor";
37
38/// Callback trait for resolving cross-pack provider invocations.
39///
40/// When a `provider.invoke` node references a provider that is not in the
41/// current pack, the flow engine calls this resolver as a fallback.
42/// Implementations typically delegate to a capability registry that knows
43/// about all packs in the bundle.
44pub trait CrossPackResolver: Send + Sync {
45    fn invoke(
46        &self,
47        provider_id: &str,
48        provider_type: Option<&str>,
49        op: &str,
50        input: &[u8],
51        tenant: &str,
52        team: Option<&str>,
53    ) -> Result<Value>;
54}
55
56pub struct FlowEngine {
57    packs: Vec<Arc<PackRuntime>>,
58    flows: Vec<FlowDescriptor>,
59    flow_sources: HashMap<FlowKey, usize>,
60    flow_cache: RwLock<HashMap<FlowKey, HostFlow>>,
61    default_env: String,
62    validation: ValidationConfig,
63    cross_pack_resolver: Option<Arc<dyn CrossPackResolver>>,
64    /// Rollout identifiers of the revision-keyed runtime this engine belongs to,
65    /// stamped onto every per-invocation `TenantCtx` for telemetry attribution
66    /// (C5.4). Empty for tenant-only (legacy) runtimes; the Phase-D revision
67    /// dispatcher supplies real IDs via [`with_rollout_ids`](Self::with_rollout_ids).
68    rollout_ids: RolloutIds,
69}
70
71#[derive(Clone, Debug, PartialEq, Eq, Hash)]
72struct FlowKey {
73    pack_id: String,
74    flow_id: String,
75}
76
77#[derive(Clone, Debug, Serialize, Deserialize)]
78pub struct FlowSnapshot {
79    pub pack_id: String,
80    pub flow_id: String,
81    #[serde(default, skip_serializing_if = "Option::is_none")]
82    pub next_flow: Option<String>,
83    pub next_node: String,
84    pub state: ExecutionState,
85}
86
87#[derive(Clone, Debug)]
88pub struct FlowWait {
89    pub reason: Option<String>,
90    pub snapshot: FlowSnapshot,
91}
92
93#[derive(Clone, Debug)]
94pub enum FlowStatus {
95    Completed,
96    Waiting(Box<FlowWait>),
97}
98
99#[derive(Clone, Debug)]
100pub struct FlowExecution {
101    pub output: Value,
102    pub status: FlowStatus,
103}
104
105#[derive(Clone, Debug)]
106struct HostFlow {
107    id: String,
108    start: Option<NodeId>,
109    nodes: IndexMap<NodeId, HostNode>,
110    /// Flow-level slot definitions extracted from `metadata.extra["greentic.slot_schema"]`.
111    /// Injected into slot-extractor component invocations at dispatch time (Phase D).
112    slot_schema: Option<Value>,
113}
114
115#[derive(Clone, Debug)]
116pub struct HostNode {
117    kind: NodeKind,
118    /// Backwards-compatible component label for observers/transcript.
119    pub component: String,
120    component_id: String,
121    operation_name: Option<String>,
122    operation_in_mapping: Option<String>,
123    payload_expr: Value,
124    routing: Routing,
125}
126
127impl HostNode {
128    pub fn component_id(&self) -> &str {
129        &self.component_id
130    }
131
132    pub fn operation_name(&self) -> Option<&str> {
133        self.operation_name.as_deref()
134    }
135
136    pub fn operation_in_mapping(&self) -> Option<&str> {
137        self.operation_in_mapping.as_deref()
138    }
139}
140
141#[derive(Clone, Debug)]
142enum NodeKind {
143    Exec { target_component: String },
144    PackComponent { component_ref: String },
145    ProviderInvoke,
146    FlowCall,
147    BuiltinEmit { kind: EmitKind },
148    BuiltinStateGet,
149    BuiltinStateSet,
150    Wait,
151}
152
153#[derive(Clone, Debug)]
154enum EmitKind {
155    Log,
156    Response,
157    Other(String),
158}
159
160struct ComponentOverrides<'a> {
161    component: Option<&'a str>,
162    operation: Option<&'a str>,
163}
164
165struct ComponentCall {
166    component_ref: String,
167    operation: String,
168    input: Value,
169    config: Value,
170}
171
172impl FlowExecution {
173    fn completed(output: Value) -> Self {
174        Self {
175            output,
176            status: FlowStatus::Completed,
177        }
178    }
179
180    fn waiting(output: Value, wait: FlowWait) -> Self {
181        Self {
182            output,
183            status: FlowStatus::Waiting(Box::new(wait)),
184        }
185    }
186}
187
188impl FlowEngine {
189    pub async fn new(packs: Vec<Arc<PackRuntime>>, config: Arc<HostConfig>) -> Result<Self> {
190        let mut flow_sources: HashMap<FlowKey, usize> = HashMap::new();
191        let mut descriptors = Vec::new();
192        let mut bindings = HashMap::new();
193        for pack in &config.pack_bindings {
194            bindings.insert(pack.pack_id.clone(), pack.flows.clone());
195        }
196        let enforce_bindings = !bindings.is_empty();
197        for (idx, pack) in packs.iter().enumerate() {
198            let pack_id = pack.metadata().pack_id.clone();
199            if enforce_bindings && !bindings.contains_key(&pack_id) {
200                bail!("no gtbind entries found for pack {}", pack_id);
201            }
202            let flows = pack.list_flows().await?;
203            let allowed = bindings.get(&pack_id).map(|flows| {
204                flows
205                    .iter()
206                    .cloned()
207                    .collect::<std::collections::HashSet<_>>()
208            });
209            let mut seen = std::collections::HashSet::new();
210            for flow in flows {
211                if let Some(ref allow) = allowed
212                    && !allow.contains(&flow.id)
213                {
214                    continue;
215                }
216                seen.insert(flow.id.clone());
217                tracing::info!(
218                    flow_id = %flow.id,
219                    flow_type = %flow.flow_type,
220                    pack_id = %flow.pack_id,
221                    pack_index = idx,
222                    "registered flow"
223                );
224                if let Ok(flow_ir) = pack.load_flow(&flow.id) {
225                    for node in flow_ir.nodes.values() {
226                        config
227                            .secrets_policy
228                            .register_flow_secret_refs(&node.input.mapping);
229                        config
230                            .secrets_policy
231                            .register_flow_secret_refs(&node.output.mapping);
232                    }
233                }
234                flow_sources.insert(
235                    FlowKey {
236                        pack_id: flow.pack_id.clone(),
237                        flow_id: flow.id.clone(),
238                    },
239                    idx,
240                );
241                descriptors.retain(|existing: &FlowDescriptor| {
242                    !(existing.id == flow.id && existing.pack_id == flow.pack_id)
243                });
244                descriptors.push(flow);
245            }
246            if let Some(allow) = allowed {
247                let missing = allow.difference(&seen).cloned().collect::<Vec<_>>();
248                if !missing.is_empty() {
249                    bail!(
250                        "gtbind flow ids missing in pack {}: {}",
251                        pack_id,
252                        missing.join(", ")
253                    );
254                }
255            }
256        }
257
258        let mut flow_map = HashMap::new();
259        for flow in &descriptors {
260            let pack_id = flow.pack_id.clone();
261            if let Some(&pack_idx) = flow_sources.get(&FlowKey {
262                pack_id: pack_id.clone(),
263                flow_id: flow.id.clone(),
264            }) {
265                let pack_clone = Arc::clone(&packs[pack_idx]);
266                let flow_id = flow.id.clone();
267                let task_flow_id = flow_id.clone();
268                match task::spawn_blocking(move || pack_clone.load_flow(&task_flow_id)).await {
269                    Ok(Ok(loaded_flow)) => {
270                        flow_map.insert(
271                            FlowKey {
272                                pack_id: pack_id.clone(),
273                                flow_id,
274                            },
275                            HostFlow::from(loaded_flow),
276                        );
277                    }
278                    Ok(Err(err)) => {
279                        tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
280                    }
281                    Err(err) => {
282                        tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
283                    }
284                }
285            }
286        }
287
288        Ok(Self {
289            packs,
290            flows: descriptors,
291            flow_sources,
292            flow_cache: RwLock::new(flow_map),
293            default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
294            validation: config.validation.clone(),
295            cross_pack_resolver: None,
296            rollout_ids: RolloutIds::default(),
297        })
298    }
299
300    /// Bind the rollout identifiers of the revision-keyed runtime this engine
301    /// serves, so every invocation's telemetry carries deployment/bundle/
302    /// revision attribution (C5.4). Called by the Phase-D revision dispatcher
303    /// when it constructs a revision runtime; tenant-only runtimes leave the
304    /// default (empty) IDs.
305    pub fn with_rollout_ids(mut self, rollout_ids: RolloutIds) -> Self {
306        self.rollout_ids = rollout_ids;
307        self
308    }
309
310    /// The rollout identifiers bound to this engine (read counterpart to
311    /// [`with_rollout_ids`](Self::with_rollout_ids)). Empty by default for the
312    /// legacy tenant-only path.
313    pub fn rollout_ids(&self) -> &RolloutIds {
314        &self.rollout_ids
315    }
316
317    /// Set an optional cross-pack resolver for `provider.invoke` nodes that
318    /// reference providers in other packs (resolved via capability registry).
319    pub fn set_cross_pack_resolver(&mut self, resolver: Arc<dyn CrossPackResolver>) {
320        self.cross_pack_resolver = Some(resolver);
321    }
322
323    async fn get_or_load_flow(&self, pack_id: &str, flow_id: &str) -> Result<HostFlow> {
324        let key = FlowKey {
325            pack_id: pack_id.to_string(),
326            flow_id: flow_id.to_string(),
327        };
328        if let Some(flow) = self.flow_cache.read().get(&key).cloned() {
329            return Ok(flow);
330        }
331
332        let pack_idx = *self
333            .flow_sources
334            .get(&key)
335            .with_context(|| format!("flow {pack_id}:{flow_id} not registered"))?;
336        let pack = Arc::clone(&self.packs[pack_idx]);
337        let flow_id_owned = flow_id.to_string();
338        let task_flow_id = flow_id_owned.clone();
339        let flow = task::spawn_blocking(move || pack.load_flow(&task_flow_id))
340            .await
341            .context("failed to join flow metadata task")??;
342        let host_flow = HostFlow::from(flow);
343        self.flow_cache.write().insert(
344            FlowKey {
345                pack_id: pack_id.to_string(),
346                flow_id: flow_id_owned.clone(),
347            },
348            host_flow.clone(),
349        );
350        Ok(host_flow)
351    }
352
353    /// Create the `flow.execute` span and install per-invocation telemetry:
354    /// declared span fields, the task-local tenant context, and the **exported**
355    /// `gt.*` attribution — the live `pack_id` plus any rollout identifiers from
356    /// the owning revision runtime (C5.4). Returned for the caller to
357    /// `.instrument()`. Both `execute` and `resume` route through here so every
358    /// per-invocation entry point carries the same attribution.
359    fn flow_execute_span(&self, ctx: &FlowContext<'_>) -> tracing::Span {
360        let span = tracing::info_span!(
361            "flow.execute",
362            tenant = tracing::field::Empty,
363            flow_id = tracing::field::Empty,
364            node_id = tracing::field::Empty,
365            tool = tracing::field::Empty,
366            action = tracing::field::Empty
367        );
368        annotate_span(
369            &span,
370            &FlowSpanAttributes {
371                tenant: ctx.tenant,
372                flow_id: ctx.flow_id,
373                node_id: ctx.node_id,
374                tool: ctx.tool,
375                action: ctx.action,
376            },
377        );
378        set_flow_context(
379            &span,
380            &self.default_env,
381            ctx.tenant,
382            ctx.flow_id,
383            ctx.node_id,
384            ctx.provider_id,
385            ctx.session_id,
386            ctx.pack_id,
387            &self.rollout_ids,
388        );
389        span
390    }
391
392    pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
393        let span = self.flow_execute_span(&ctx);
394        let retry_config = ctx.retry_config;
395        let original_input = input;
396        let mut ctx = ctx;
397        let metric_tenant = ctx.tenant.to_string();
398        let metric_flow_id = ctx.flow_id.to_string();
399        let started = std::time::Instant::now();
400        let result = async move {
401            let mut attempt = 0u32;
402            loop {
403                attempt += 1;
404                ctx.attempt = attempt;
405                #[cfg(feature = "fault-injection")]
406                {
407                    let fault_ctx = FaultContext {
408                        pack_id: ctx.pack_id,
409                        flow_id: ctx.flow_id,
410                        node_id: ctx.node_id,
411                        attempt: ctx.attempt,
412                    };
413                    maybe_fail(FaultPoint::Timeout, fault_ctx)
414                        .map_err(|err| anyhow!(err.to_string()))?;
415                }
416                match self.execute_once(&ctx, original_input.clone()).await {
417                    Ok(value) => return Ok(value),
418                    Err(err) => {
419                        if attempt >= retry_config.max_attempts || !should_retry(&err) {
420                            // User-facing session flows surface the terminal
421                            // error as a metadata-only Ok envelope so the
422                            // messaging provider renders it instead of leaking
423                            // raw engine text to the chat.
424                            if ctx.session_id.is_some() {
425                                return Ok(FlowExecution::completed(json!({
426                                    "metadata": {
427                                        "error_kind": "flow_execution_failed",
428                                        "error_message": err.to_string(),
429                                        "flow_id": ctx.flow_id,
430                                    }
431                                })));
432                            }
433                            return Err(err);
434                        }
435                        let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
436                        tracing::warn!(
437                            tenant = ctx.tenant,
438                            flow_id = ctx.flow_id,
439                            attempt,
440                            max_attempts = retry_config.max_attempts,
441                            delay_ms = delay,
442                            error = %err,
443                            "transient flow execution failure, backing off"
444                        );
445                        tokio::time::sleep(Duration::from_millis(delay)).await;
446                    }
447                }
448            }
449        }
450        .instrument(span)
451        .await;
452        let status = if result.is_ok() { "ok" } else { "err" };
453        let duration_ms = started.elapsed().as_secs_f64() * 1000.0;
454        crate::metrics::record_flow_execution(&metric_tenant, &metric_flow_id, status, duration_ms);
455        result
456    }
457
458    pub async fn resume(
459        &self,
460        ctx: FlowContext<'_>,
461        snapshot: FlowSnapshot,
462        input: Value,
463    ) -> Result<FlowExecution> {
464        if snapshot.pack_id != ctx.pack_id {
465            bail!(
466                "snapshot pack {} does not match requested {}",
467                snapshot.pack_id,
468                ctx.pack_id
469            );
470        }
471        let resume_flow = snapshot
472            .next_flow
473            .clone()
474            .unwrap_or_else(|| snapshot.flow_id.clone());
475        let flow_ir = self.get_or_load_flow(ctx.pack_id, &resume_flow).await?;
476        let mut state = snapshot.state;
477        // Replace BOTH `input` AND `entry` with the new activity. The
478        // routing context (built by `build_routing_context`) reads
479        // `entry.input.metadata.*` for the synthesised `response.*` fields
480        // that conditional routes test against — keeping the snapshot's
481        // stale entry would make `response.action` perpetually empty and
482        // every condition fail, looping the user back to the wait point
483        // forever. `replace_input` only touches `state.input`, so we have
484        // to refresh `entry` ourselves; `ensure_entry` is a no-op once
485        // entry is non-null.
486        state.replace_input(input.clone());
487        state.entry = input;
488        let span = self.flow_execute_span(&ctx);
489        self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node), resume_flow)
490            .instrument(span)
491            .await
492    }
493
494    async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
495        let flow_ir = self.get_or_load_flow(ctx.pack_id, ctx.flow_id).await?;
496        let state = ExecutionState::new(input);
497        self.drive_flow(ctx, flow_ir, state, None, ctx.flow_id.to_string())
498            .await
499    }
500
501    async fn drive_flow(
502        &self,
503        ctx: &FlowContext<'_>,
504        mut flow_ir: HostFlow,
505        mut state: ExecutionState,
506        resume_from: Option<String>,
507        mut current_flow_id: String,
508    ) -> Result<FlowExecution> {
509        let mut current = match resume_from {
510            Some(node) => NodeId::from_str(&node)
511                .with_context(|| format!("invalid resume node id `{node}`"))?,
512            None => flow_ir
513                .start
514                .clone()
515                .or_else(|| flow_ir.nodes.keys().next().cloned())
516                .with_context(|| format!("flow {} has no start node", flow_ir.id))?,
517        };
518
519        loop {
520            let step_ctx = FlowContext {
521                tenant: ctx.tenant,
522                pack_id: ctx.pack_id,
523                flow_id: current_flow_id.as_str(),
524                node_id: ctx.node_id,
525                tool: ctx.tool,
526                action: ctx.action,
527                session_id: ctx.session_id,
528                provider_id: ctx.provider_id,
529                retry_config: ctx.retry_config,
530                attempt: ctx.attempt,
531                observer: ctx.observer,
532                mocks: ctx.mocks,
533            };
534            let node = flow_ir
535                .nodes
536                .get(&current)
537                .with_context(|| format!("node {} not found", current.as_str()))?;
538
539            let payload_template = node.payload_expr.clone();
540            let prev = state
541                .last_output
542                .as_ref()
543                .cloned()
544                .unwrap_or_else(|| Value::Object(JsonMap::new()));
545            let ctx_value = template_context(&state, prev);
546            #[cfg(feature = "fault-injection")]
547            {
548                let fault_ctx = FaultContext {
549                    pack_id: ctx.pack_id,
550                    flow_id: ctx.flow_id,
551                    node_id: Some(current.as_str()),
552                    attempt: ctx.attempt,
553                };
554                maybe_fail(FaultPoint::TemplateRender, fault_ctx)
555                    .map_err(|err| anyhow!(err.to_string()))?;
556            }
557            let mut payload =
558                render_template_value(&payload_template, &ctx_value, TemplateOptions::default())
559                    .context("failed to render node input template")?;
560            let node_id = current.clone();
561
562            // Phase D: inject flow-level slot_schema as slot_definitions into
563            // the slot-extractor's input when the author omitted inline
564            // definitions. Explicit inline `slot_definitions` win
565            // (back-compat with M2.4 NDA demo).
566            if let NodeKind::Exec { target_component } = &node.kind
567                && target_component == SLOT_EXTRACTOR_COMPONENT_ID
568                && let Some(schema) = flow_ir.slot_schema.as_ref()
569                && let Some(map) = payload.as_object_mut()
570            {
571                let input = map.entry("input").or_insert(Value::Null);
572                inject_slot_definitions(input, schema, step_ctx.flow_id, node_id.as_str());
573            }
574
575            let observed_payload = payload.clone();
576            let event = NodeEvent {
577                context: &step_ctx,
578                node_id: node_id.as_str(),
579                node,
580                payload: &observed_payload,
581            };
582            if let Some(observer) = step_ctx.observer {
583                observer.on_node_start(&event);
584            }
585            let dispatch = self
586                .dispatch_node(
587                    &step_ctx,
588                    node_id.as_str(),
589                    node,
590                    &mut state,
591                    payload,
592                    &event,
593                )
594                .await;
595            let DispatchOutcome { output, control } = match dispatch {
596                Ok(outcome) => outcome,
597                Err(err) => {
598                    if let Some(observer) = step_ctx.observer {
599                        observer.on_node_error(&event, err.as_ref());
600                    }
601                    // Propagate so `execute()`'s retry loop can retry transient
602                    // failures, then convert to a metadata-only Ok envelope at
603                    // the top level once retries are exhausted (session flows).
604                    return Err(err);
605                }
606            };
607
608            state.nodes.insert(node_id.clone().into(), output.clone());
609            state.last_output = Some(output.payload.clone());
610            if let Some(observer) = step_ctx.observer {
611                observer.on_node_end(&event, &output.payload);
612            }
613
614            match control {
615                NodeControl::Continue => {
616                    enum NextDecision {
617                        Next(NodeId),
618                        End,
619                        Wait,
620                    }
621                    let decision = match &node.routing {
622                        Routing::Next { node_id } => NextDecision::Next(node_id.clone()),
623                        Routing::End | Routing::Reply => NextDecision::End,
624                        Routing::Branch { default, .. } => match default {
625                            Some(target) => NextDecision::Next(target.clone()),
626                            None => NextDecision::End,
627                        },
628                        Routing::Custom(raw) => {
629                            match evaluate_custom_routing(raw, &output, &state, &flow_ir, &node_id)
630                            {
631                                CustomRoutingDecision::Next(nid) => NextDecision::Next(nid),
632                                CustomRoutingDecision::End => NextDecision::End,
633                                CustomRoutingDecision::Wait => NextDecision::Wait,
634                            }
635                        }
636                    };
637
638                    match decision {
639                        NextDecision::Next(n) => current = n,
640                        NextDecision::End => {
641                            let nodes_snapshot = state.nodes.clone();
642                            let final_output = state.finalize_with(Some(output.payload.clone()));
643                            return Ok(FlowExecution::completed(lift_first_node_error_from_nodes(
644                                final_output,
645                                &nodes_snapshot,
646                            )));
647                        }
648                        NextDecision::Wait => {
649                            // Conditional routing fell through. Pause at the
650                            // current node so the next inbound activity
651                            // resumes here and re-evaluates this node's
652                            // routing with the user's new submit payload.
653                            let mut snapshot_state = state.clone();
654                            snapshot_state.clear_egress();
655                            let snapshot = FlowSnapshot {
656                                pack_id: step_ctx.pack_id.to_string(),
657                                flow_id: step_ctx.flow_id.to_string(),
658                                next_flow: (current_flow_id != step_ctx.flow_id)
659                                    .then_some(current_flow_id.clone()),
660                                next_node: node_id.as_str().to_string(),
661                                state: snapshot_state,
662                            };
663                            let output_value = state.finalize_with(Some(output.payload.clone()));
664                            return Ok(FlowExecution::waiting(
665                                output_value,
666                                FlowWait {
667                                    reason: Some(format!(
668                                        "awaiting user submit at node `{}`",
669                                        node_id.as_str()
670                                    )),
671                                    snapshot,
672                                },
673                            ));
674                        }
675                    }
676                }
677                NodeControl::Wait { reason } => {
678                    let next: Option<NodeId> = match &node.routing {
679                        Routing::Next { node_id } => Some(node_id.clone()),
680                        Routing::End | Routing::Reply => None,
681                        Routing::Branch { default, .. } => default.clone(),
682                        Routing::Custom(raw) => {
683                            match evaluate_custom_routing(raw, &output, &state, &flow_ir, &node_id)
684                            {
685                                CustomRoutingDecision::Next(nid) => Some(nid),
686                                // session.wait operator must have an
687                                // explicit forward target — both End and
688                                // Wait decisions collapse to "no next" and
689                                // surface the same error below.
690                                CustomRoutingDecision::End | CustomRoutingDecision::Wait => None,
691                            }
692                        }
693                    };
694                    let resume_target = next.ok_or_else(|| {
695                        anyhow!(
696                            "session.wait node {} requires a non-empty route",
697                            current.as_str()
698                        )
699                    })?;
700                    let mut snapshot_state = state.clone();
701                    snapshot_state.clear_egress();
702                    let snapshot = FlowSnapshot {
703                        pack_id: step_ctx.pack_id.to_string(),
704                        flow_id: step_ctx.flow_id.to_string(),
705                        next_flow: (current_flow_id != step_ctx.flow_id)
706                            .then_some(current_flow_id.clone()),
707                        next_node: resume_target.as_str().to_string(),
708                        state: snapshot_state,
709                    };
710                    let output_value = state.clone().finalize_with(None);
711                    return Ok(FlowExecution::waiting(
712                        output_value,
713                        FlowWait { reason, snapshot },
714                    ));
715                }
716                NodeControl::Jump(jump) => {
717                    let jump_target = self.apply_jump(&step_ctx, &mut state, jump).await?;
718                    flow_ir = jump_target.flow;
719                    current_flow_id = jump_target.flow_id;
720                    current = jump_target.node_id;
721                }
722                NodeControl::Respond {
723                    text,
724                    card_cbor,
725                    needs_user,
726                } => {
727                    let response = json!({
728                        "text": text,
729                        "card_cbor": card_cbor,
730                        "needs_user": needs_user,
731                    });
732                    state.push_egress(response);
733                    let nodes_snapshot = state.nodes.clone();
734                    let final_output = state.finalize_with(None);
735                    return Ok(FlowExecution::completed(lift_first_node_error_from_nodes(
736                        final_output,
737                        &nodes_snapshot,
738                    )));
739                }
740            }
741        }
742    }
743
744    async fn dispatch_node(
745        &self,
746        ctx: &FlowContext<'_>,
747        node_id: &str,
748        node: &HostNode,
749        state: &mut ExecutionState,
750        mut payload: Value,
751        event: &NodeEvent<'_>,
752    ) -> Result<DispatchOutcome> {
753        inject_card_locale(&mut payload, &state.entry);
754        match &node.kind {
755            NodeKind::Exec { target_component } => self
756                .execute_component_exec(
757                    ctx,
758                    node_id,
759                    node,
760                    payload,
761                    event,
762                    ComponentOverrides {
763                        component: Some(target_component.as_str()),
764                        operation: node.operation_name.as_deref(),
765                    },
766                )
767                .await
768                .and_then(component_dispatch_outcome),
769            NodeKind::PackComponent { component_ref } => self
770                .execute_component_call(ctx, node_id, node, payload, component_ref.as_str(), event)
771                .await
772                .and_then(component_dispatch_outcome),
773            NodeKind::FlowCall => self
774                .execute_flow_call(ctx, payload)
775                .await
776                .map(DispatchOutcome::complete),
777            NodeKind::ProviderInvoke => self
778                .execute_provider_invoke(ctx, node_id, state, payload, event)
779                .await
780                .map(DispatchOutcome::complete),
781            NodeKind::BuiltinEmit { kind } => {
782                match kind {
783                    EmitKind::Log | EmitKind::Response => {}
784                    EmitKind::Other(component) => {
785                        tracing::debug!(%component, "handling emit.* as builtin");
786                    }
787                }
788                state.push_egress(payload.clone());
789                Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
790            }
791            NodeKind::BuiltinStateGet => self
792                .execute_state_get(ctx, payload)
793                .await
794                .map(DispatchOutcome::complete),
795            NodeKind::BuiltinStateSet => self
796                .execute_state_set(ctx, payload)
797                .await
798                .map(DispatchOutcome::complete),
799            NodeKind::Wait => {
800                let reason = extract_wait_reason(&payload);
801                Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
802            }
803        }
804    }
805
806    async fn execute_state_get(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
807        let key = Self::extract_state_key_helper(&payload)?;
808        let pack = self.pack_for_flow(ctx)?;
809        let store = pack
810            .state_store_handle()
811            .context("state store is not configured for this runtime")?;
812        let tenant_ctx = self.state_tenant_ctx(ctx)?;
813        let state_key = greentic_state::StateKey::new(&key);
814        let value = store
815            .get_json(
816                &tenant_ctx,
817                crate::storage::state::STATE_PREFIX,
818                &state_key,
819                None,
820            )
821            .with_context(|| format!("state.get failed for key `{key}`"))?;
822        let payload = serde_json::json!({
823            "key": key,
824            "value": value,
825            "found": value.is_some(),
826        });
827        Ok(NodeOutput::new(payload))
828    }
829
830    async fn execute_state_set(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
831        let key = Self::extract_state_key_helper(&payload)?;
832        let value = payload.get("value").cloned().unwrap_or(Value::Null);
833        let pack = self.pack_for_flow(ctx)?;
834        let store = pack
835            .state_store_handle()
836            .context("state store is not configured for this runtime")?;
837        let tenant_ctx = self.state_tenant_ctx(ctx)?;
838        let state_key = greentic_state::StateKey::new(&key);
839        store
840            .set_json(
841                &tenant_ctx,
842                crate::storage::state::STATE_PREFIX,
843                &state_key,
844                None,
845                &value,
846                None,
847            )
848            .with_context(|| format!("state.set failed for key `{key}`"))?;
849        let payload = serde_json::json!({ "key": key, "value": value });
850        Ok(NodeOutput::new(payload))
851    }
852
853    fn pack_for_flow(&self, ctx: &FlowContext<'_>) -> Result<&Arc<PackRuntime>> {
854        let key = FlowKey {
855            pack_id: ctx.pack_id.to_string(),
856            flow_id: ctx.flow_id.to_string(),
857        };
858        let idx = self.flow_sources.get(&key).with_context(|| {
859            format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
860        })?;
861        Ok(&self.packs[*idx])
862    }
863
864    fn extract_state_key_helper(payload: &Value) -> Result<String> {
865        payload
866            .get("key")
867            .and_then(Value::as_str)
868            .map(String::from)
869            .filter(|k| !k.is_empty())
870            .context("state node payload missing required `key` (non-empty string)")
871    }
872
873    fn state_tenant_ctx(&self, ctx: &FlowContext<'_>) -> Result<greentic_types::TenantCtx> {
874        let env = greentic_types::EnvId::from_str(&self.default_env)
875            .with_context(|| format!("invalid env id `{}`", self.default_env))?;
876        let tenant = greentic_types::TenantId::from_str(ctx.tenant)
877            .with_context(|| format!("invalid tenant id `{}`", ctx.tenant))?;
878        Ok(greentic_types::TenantCtx::new(env, tenant))
879    }
880
881    async fn apply_jump(
882        &self,
883        ctx: &FlowContext<'_>,
884        state: &mut ExecutionState,
885        jump: JumpControl,
886    ) -> Result<JumpTarget> {
887        let target_flow = jump.flow.trim();
888        if target_flow.is_empty() {
889            bail!("missing_flow");
890        }
891
892        let flow = self
893            .get_or_load_flow(ctx.pack_id, target_flow)
894            .await
895            .with_context(|| format!("unknown_flow:{target_flow}"))?;
896
897        let target_node = if let Some(node) = jump.node.as_deref() {
898            let parsed = NodeId::from_str(node).with_context(|| format!("unknown_node:{node}"))?;
899            if !flow.nodes.contains_key(&parsed) {
900                bail!("unknown_node:{node}");
901            }
902            parsed
903        } else {
904            flow.start
905                .clone()
906                .or_else(|| flow.nodes.keys().next().cloned())
907                .ok_or_else(|| anyhow!("jump_failed: flow {target_flow} has no start node"))?
908        };
909
910        let max_redirects = jump.max_redirects.unwrap_or(3);
911        if state.redirect_count() >= max_redirects {
912            bail!("redirect_limit");
913        }
914        state.increment_redirect_count();
915        state.replace_input(jump.payload.clone());
916        state.last_output = Some(jump.payload);
917        tracing::info!(
918            flow_id = %ctx.flow_id,
919            target_flow = %target_flow,
920            target_node = %target_node.as_str(),
921            reason = ?jump.reason,
922            redirects = state.redirect_count(),
923            "flow.jump.applied"
924        );
925
926        Ok(JumpTarget {
927            flow_id: target_flow.to_string(),
928            flow,
929            node_id: target_node,
930        })
931    }
932
933    async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
934        #[derive(Deserialize)]
935        struct FlowCallPayload {
936            #[serde(alias = "flow")]
937            flow_id: String,
938            #[serde(default)]
939            input: Value,
940        }
941
942        let call: FlowCallPayload =
943            serde_json::from_value(payload).context("invalid payload for flow.call node")?;
944        if call.flow_id.trim().is_empty() {
945            bail!("flow.call requires a non-empty flow_id");
946        }
947
948        let sub_input = if call.input.is_null() {
949            Value::Null
950        } else {
951            call.input
952        };
953
954        let flow_id_owned = call.flow_id;
955        let action = "flow.call";
956        let sub_ctx = FlowContext {
957            tenant: ctx.tenant,
958            pack_id: ctx.pack_id,
959            flow_id: flow_id_owned.as_str(),
960            node_id: None,
961            tool: ctx.tool,
962            action: Some(action),
963            session_id: ctx.session_id,
964            provider_id: ctx.provider_id,
965            retry_config: ctx.retry_config,
966            attempt: ctx.attempt,
967            observer: ctx.observer,
968            mocks: ctx.mocks,
969        };
970
971        let execution = Box::pin(self.execute(sub_ctx, sub_input))
972            .await
973            .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
974        match execution.status {
975            FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
976            FlowStatus::Waiting(wait) => bail!(
977                "flow.call cannot pause (flow {} waiting {:?})",
978                flow_id_owned,
979                wait.reason
980            ),
981        }
982    }
983
984    async fn execute_component_exec(
985        &self,
986        ctx: &FlowContext<'_>,
987        node_id: &str,
988        node: &HostNode,
989        payload: Value,
990        event: &NodeEvent<'_>,
991        overrides: ComponentOverrides<'_>,
992    ) -> Result<NodeOutput> {
993        #[derive(Deserialize)]
994        struct ComponentPayload {
995            #[serde(default, alias = "component_ref", alias = "component")]
996            component: Option<String>,
997            #[serde(alias = "op")]
998            operation: Option<String>,
999            #[serde(default)]
1000            input: Value,
1001            #[serde(default)]
1002            config: Value,
1003        }
1004
1005        let payload: ComponentPayload =
1006            serde_json::from_value(payload).context("invalid payload for component.exec")?;
1007        let component_ref = overrides
1008            .component
1009            .map(str::to_string)
1010            .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
1011            .with_context(|| "component.exec requires a component_ref")?;
1012        let operation = resolve_component_operation(
1013            node_id,
1014            node.component_id.as_str(),
1015            payload.operation,
1016            overrides.operation,
1017            node.operation_in_mapping.as_deref(),
1018        )?;
1019
1020        let call = ComponentCall {
1021            component_ref,
1022            operation,
1023            input: payload.input,
1024            config: payload.config,
1025        };
1026
1027        self.invoke_component_call(ctx, node_id, call, event).await
1028    }
1029
1030    async fn execute_component_call(
1031        &self,
1032        ctx: &FlowContext<'_>,
1033        node_id: &str,
1034        node: &HostNode,
1035        payload: Value,
1036        component_ref: &str,
1037        event: &NodeEvent<'_>,
1038    ) -> Result<NodeOutput> {
1039        let payload_operation = extract_operation_from_mapping(&payload);
1040        let (input, config) = split_operation_payload(payload);
1041        let operation = resolve_component_operation(
1042            node_id,
1043            node.component_id.as_str(),
1044            payload_operation,
1045            node.operation_name.as_deref(),
1046            node.operation_in_mapping.as_deref(),
1047        )?;
1048        let call = ComponentCall {
1049            component_ref: component_ref.to_string(),
1050            operation,
1051            input,
1052            config,
1053        };
1054        self.invoke_component_call(ctx, node_id, call, event).await
1055    }
1056
1057    async fn invoke_component_call(
1058        &self,
1059        ctx: &FlowContext<'_>,
1060        node_id: &str,
1061        mut call: ComponentCall,
1062        event: &NodeEvent<'_>,
1063    ) -> Result<NodeOutput> {
1064        self.validate_component(ctx, event, &call)?;
1065        let key = FlowKey {
1066            pack_id: ctx.pack_id.to_string(),
1067            flow_id: ctx.flow_id.to_string(),
1068        };
1069        let pack_idx = *self.flow_sources.get(&key).with_context(|| {
1070            format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
1071        })?;
1072        let pack = Arc::clone(&self.packs[pack_idx]);
1073
1074        // Promote adaptive-card defaults from node config (default_card_asset /
1075        // default_card_inline / default_source) into the invocation, so the
1076        // component receives a valid `card_spec` field even when the user input
1077        // is empty (e.g. webchat ConversationStart with no text). Without this,
1078        // schema validation in the component reports AC_INVOCATION_MISSING_FIELD
1079        // and the renderer falls back to a generic "Welcome" placeholder.
1080        promote_card_config_to_invocation(&mut call.input, &call.config);
1081
1082        // Pre-resolve card asset paths: read JSON files from the pack's assets
1083        // directory and inject as inline_json so the component doesn't need
1084        // WASI filesystem access.
1085        resolve_card_assets(&mut call.input, &pack);
1086
1087        // When the input is a card-like invocation (has card_source/card_spec),
1088        // pass it directly to the component instead of wrapping in an
1089        // InvocationEnvelope.  The envelope serialises the payload field as a
1090        // byte array which the component cannot decode back, and the
1091        // InvocationPayload::parse heuristic strips domain fields when a
1092        // `payload` key is present (e.g.  the card's Handlebars template
1093        // context `payload: {}`).
1094        let is_card = is_card_invocation(&call.input);
1095
1096        let input_json = if is_card {
1097            serde_json::to_string(&call.input)?
1098        } else {
1099            // Runtime owns ctx; flows must not embed ctx, even if they provide envelopes.
1100            let meta = InvocationMeta {
1101                env: &self.default_env,
1102                tenant: ctx.tenant,
1103                flow_id: ctx.flow_id,
1104                node_id: Some(node_id),
1105                provider_id: ctx.provider_id,
1106                session_id: ctx.session_id,
1107                attempt: ctx.attempt,
1108            };
1109            let invocation_envelope =
1110                build_invocation_envelope(meta, call.operation.as_str(), call.input)
1111                    .context("build invocation envelope for component call")?;
1112            serde_json::to_string(&invocation_envelope)?
1113        };
1114        let config_json = if call.config.is_null() {
1115            None
1116        } else {
1117            Some(serde_json::to_string(&call.config)?)
1118        };
1119
1120        let exec_ctx = component_exec_ctx(ctx, node_id);
1121        #[cfg(feature = "fault-injection")]
1122        {
1123            let fault_ctx = FaultContext {
1124                pack_id: ctx.pack_id,
1125                flow_id: ctx.flow_id,
1126                node_id: Some(node_id),
1127                attempt: ctx.attempt,
1128            };
1129            maybe_fail(FaultPoint::BeforeComponentCall, fault_ctx)
1130                .map_err(|err| anyhow!(err.to_string()))?;
1131        }
1132        let value = pack
1133            .invoke_component(
1134                call.component_ref.as_str(),
1135                exec_ctx,
1136                call.operation.as_str(),
1137                config_json,
1138                input_json,
1139            )
1140            .await?;
1141        #[cfg(feature = "fault-injection")]
1142        {
1143            let fault_ctx = FaultContext {
1144                pack_id: ctx.pack_id,
1145                flow_id: ctx.flow_id,
1146                node_id: Some(node_id),
1147                attempt: ctx.attempt,
1148            };
1149            maybe_fail(FaultPoint::AfterComponentCall, fault_ctx)
1150                .map_err(|err| anyhow!(err.to_string()))?;
1151        }
1152
1153        if let Some((code, message)) = component_error(&value) {
1154            bail!(
1155                "component {} failed: {}: {}",
1156                call.component_ref,
1157                code,
1158                message
1159            );
1160        }
1161        // MCP-shaped tool errors (greentic-mcp-generator's tool_error_with_status)
1162        // come back as a top-level `{ "error": { "code", "message", "status" } }`
1163        // value with the WIT envelope still ok=true (because the wasm guest
1164        // returned normally). Treat them the same as a component_error so the
1165        // engine error-envelope lift path surfaces the failure to the user.
1166        if let Some((code, message)) = mcp_tool_error(&value) {
1167            bail!(
1168                "component {} returned tool error: {}: {}",
1169                call.component_ref,
1170                code,
1171                message
1172            );
1173        }
1174        Ok(NodeOutput::new(value))
1175    }
1176
1177    async fn execute_provider_invoke(
1178        &self,
1179        ctx: &FlowContext<'_>,
1180        node_id: &str,
1181        state: &ExecutionState,
1182        payload: Value,
1183        event: &NodeEvent<'_>,
1184    ) -> Result<NodeOutput> {
1185        #[derive(Deserialize)]
1186        struct ProviderPayload {
1187            #[serde(default)]
1188            provider_id: Option<String>,
1189            #[serde(default)]
1190            provider_type: Option<String>,
1191            #[serde(default, alias = "operation")]
1192            op: Option<String>,
1193            #[serde(default)]
1194            input: Value,
1195            #[serde(default)]
1196            in_map: Value,
1197            #[serde(default)]
1198            out_map: Value,
1199            #[serde(default)]
1200            err_map: Value,
1201        }
1202
1203        let payload: ProviderPayload =
1204            serde_json::from_value(payload).context("invalid payload for provider.invoke")?;
1205        let op = payload
1206            .op
1207            .as_deref()
1208            .filter(|v| !v.trim().is_empty())
1209            .with_context(|| "provider.invoke requires an op")?
1210            .to_string();
1211
1212        let prev = state
1213            .last_output
1214            .as_ref()
1215            .cloned()
1216            .unwrap_or_else(|| Value::Object(JsonMap::new()));
1217        let base_ctx = template_context(state, prev);
1218
1219        let input_value = if !payload.in_map.is_null() {
1220            let mut ctx_value = base_ctx.clone();
1221            if let Value::Object(ref mut map) = ctx_value {
1222                map.insert("input".into(), payload.input.clone());
1223                map.insert("result".into(), payload.input.clone());
1224            }
1225            render_template_value(
1226                &payload.in_map,
1227                &ctx_value,
1228                TemplateOptions {
1229                    allow_pointer: true,
1230                },
1231            )
1232            .context("failed to render provider.invoke in_map")?
1233        } else if !payload.input.is_null() {
1234            payload.input
1235        } else {
1236            Value::Null
1237        };
1238        let input_json = serde_json::to_vec(&input_value)?;
1239
1240        self.validate_tool(
1241            ctx,
1242            event,
1243            payload.provider_id.as_deref(),
1244            payload.provider_type.as_deref(),
1245            &op,
1246            &input_value,
1247        )?;
1248
1249        let key = FlowKey {
1250            pack_id: ctx.pack_id.to_string(),
1251            flow_id: ctx.flow_id.to_string(),
1252        };
1253        let pack_idx = *self.flow_sources.get(&key).with_context(|| {
1254            format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
1255        })?;
1256        let pack = Arc::clone(&self.packs[pack_idx]);
1257        let binding = pack.resolve_provider(
1258            payload.provider_id.as_deref(),
1259            payload.provider_type.as_deref(),
1260        );
1261
1262        // If pack-local resolution fails, try the cross-pack resolver (capability registry).
1263        if binding.is_err()
1264            && let Some(output) = self.try_invoke_cross_pack_resolver(
1265                payload.provider_id.as_deref(),
1266                payload.provider_type.as_deref(),
1267                &op,
1268                &input_json,
1269                ctx.tenant,
1270            )?
1271        {
1272            return Ok(output);
1273        }
1274
1275        let binding = binding?;
1276        let exec_ctx = component_exec_ctx(ctx, node_id);
1277        #[cfg(feature = "fault-injection")]
1278        {
1279            let fault_ctx = FaultContext {
1280                pack_id: ctx.pack_id,
1281                flow_id: ctx.flow_id,
1282                node_id: Some(node_id),
1283                attempt: ctx.attempt,
1284            };
1285            maybe_fail(FaultPoint::BeforeToolCall, fault_ctx)
1286                .map_err(|err| anyhow!(err.to_string()))?;
1287        }
1288        let provider_metric_id = payload
1289            .provider_id
1290            .as_deref()
1291            .or(payload.provider_type.as_deref())
1292            .unwrap_or("unknown");
1293        let invoke_started = std::time::Instant::now();
1294        let invoke_result = pack
1295            .invoke_provider(&binding, exec_ctx, &op, input_json)
1296            .await;
1297        let invoke_duration_ms = invoke_started.elapsed().as_secs_f64() * 1000.0;
1298        crate::metrics::record_provider_invocation(
1299            ctx.tenant,
1300            provider_metric_id,
1301            &op,
1302            if invoke_result.is_ok() { "ok" } else { "err" },
1303            invoke_duration_ms,
1304        );
1305        let result = invoke_result?;
1306        #[cfg(feature = "fault-injection")]
1307        {
1308            let fault_ctx = FaultContext {
1309                pack_id: ctx.pack_id,
1310                flow_id: ctx.flow_id,
1311                node_id: Some(node_id),
1312                attempt: ctx.attempt,
1313            };
1314            maybe_fail(FaultPoint::AfterToolCall, fault_ctx)
1315                .map_err(|err| anyhow!(err.to_string()))?;
1316        }
1317
1318        let output = if payload.out_map.is_null() {
1319            result
1320        } else {
1321            let mut ctx_value = base_ctx;
1322            if let Value::Object(ref mut map) = ctx_value {
1323                map.insert("input".into(), result.clone());
1324                map.insert("result".into(), result.clone());
1325            }
1326            render_template_value(
1327                &payload.out_map,
1328                &ctx_value,
1329                TemplateOptions {
1330                    allow_pointer: true,
1331                },
1332            )
1333            .context("failed to render provider.invoke out_map")?
1334        };
1335        let _ = payload.err_map;
1336        Ok(NodeOutput::new(output))
1337    }
1338
1339    fn try_invoke_cross_pack_resolver(
1340        &self,
1341        provider_id: Option<&str>,
1342        provider_type: Option<&str>,
1343        op: &str,
1344        input_json: &[u8],
1345        tenant: &str,
1346    ) -> Result<Option<NodeOutput>> {
1347        eprintln!(
1348            "[DEBUG] provider.invoke: pack-local failed, has_resolver={}",
1349            self.cross_pack_resolver.is_some()
1350        );
1351        let Some(resolver) = self.cross_pack_resolver.as_ref() else {
1352            return Ok(None);
1353        };
1354        let provider_id = provider_id.unwrap_or("unknown");
1355        tracing::info!(
1356            provider_id,
1357            op = %op,
1358            "provider.invoke: pack-local resolution failed, trying cross-pack resolver"
1359        );
1360        let result_value =
1361            resolver.invoke(provider_id, provider_type, op, input_json, tenant, None)?;
1362        Ok(Some(NodeOutput::new(result_value)))
1363    }
1364
1365    fn validate_component(
1366        &self,
1367        ctx: &FlowContext<'_>,
1368        event: &NodeEvent<'_>,
1369        call: &ComponentCall,
1370    ) -> Result<()> {
1371        if self.validation.mode == ValidationMode::Off {
1372            return Ok(());
1373        }
1374        let mut metadata = JsonMap::new();
1375        metadata.insert("tenant_id".to_string(), json!(ctx.tenant));
1376        if let Some(id) = ctx.session_id {
1377            metadata.insert("session".to_string(), json!({ "id": id }));
1378        }
1379        let envelope = json!({
1380            "component_id": call.component_ref,
1381            "operation": call.operation,
1382            "input": call.input,
1383            "config": call.config,
1384            "metadata": Value::Object(metadata),
1385        });
1386        let issues = validate_component_envelope(&envelope);
1387        self.report_validation(ctx, event, "component", issues)
1388    }
1389
1390    fn validate_tool(
1391        &self,
1392        ctx: &FlowContext<'_>,
1393        event: &NodeEvent<'_>,
1394        provider_id: Option<&str>,
1395        provider_type: Option<&str>,
1396        operation: &str,
1397        input: &Value,
1398    ) -> Result<()> {
1399        if self.validation.mode == ValidationMode::Off {
1400            return Ok(());
1401        }
1402        let tool_id = provider_id.or(provider_type).unwrap_or("provider.invoke");
1403        let mut metadata = JsonMap::new();
1404        metadata.insert("tenant_id".to_string(), json!(ctx.tenant));
1405        if let Some(id) = ctx.session_id {
1406            metadata.insert("session".to_string(), json!({ "id": id }));
1407        }
1408        let envelope = json!({
1409            "tool_id": tool_id,
1410            "operation": operation,
1411            "input": input,
1412            "metadata": Value::Object(metadata),
1413        });
1414        let issues = validate_tool_envelope(&envelope);
1415        self.report_validation(ctx, event, "tool", issues)
1416    }
1417
1418    fn report_validation(
1419        &self,
1420        ctx: &FlowContext<'_>,
1421        event: &NodeEvent<'_>,
1422        kind: &str,
1423        issues: Vec<ValidationIssue>,
1424    ) -> Result<()> {
1425        if issues.is_empty() {
1426            return Ok(());
1427        }
1428        if let Some(observer) = ctx.observer {
1429            observer.on_validation(event, &issues);
1430        }
1431        match self.validation.mode {
1432            ValidationMode::Warn => {
1433                tracing::warn!(
1434                    tenant = ctx.tenant,
1435                    flow_id = ctx.flow_id,
1436                    node_id = event.node_id,
1437                    kind,
1438                    issues = ?issues,
1439                    "invocation envelope validation issues"
1440                );
1441                Ok(())
1442            }
1443            ValidationMode::Error => {
1444                tracing::error!(
1445                    tenant = ctx.tenant,
1446                    flow_id = ctx.flow_id,
1447                    node_id = event.node_id,
1448                    kind,
1449                    issues = ?issues,
1450                    "invocation envelope validation failed"
1451                );
1452                bail!("invocation_validation_failed");
1453            }
1454            ValidationMode::Off => Ok(()),
1455        }
1456    }
1457
1458    pub fn flows(&self) -> &[FlowDescriptor] {
1459        &self.flows
1460    }
1461
1462    pub fn flow_by_key(&self, pack_id: &str, flow_id: &str) -> Option<&FlowDescriptor> {
1463        self.flows
1464            .iter()
1465            .find(|descriptor| descriptor.pack_id == pack_id && descriptor.id == flow_id)
1466    }
1467
1468    pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
1469        let mut matches = self
1470            .flows
1471            .iter()
1472            .filter(|descriptor| descriptor.flow_type == flow_type);
1473        let first = matches.next()?;
1474        if matches.next().is_some() {
1475            return None;
1476        }
1477        Some(first)
1478    }
1479
1480    pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
1481        let mut matches = self
1482            .flows
1483            .iter()
1484            .filter(|descriptor| descriptor.id == flow_id);
1485        let first = matches.next()?;
1486        if matches.next().is_some() {
1487            return None;
1488        }
1489        Some(first)
1490    }
1491}
1492
1493pub trait ExecutionObserver: Send + Sync {
1494    fn on_node_start(&self, event: &NodeEvent<'_>);
1495    fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
1496    fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
1497    fn on_validation(&self, _event: &NodeEvent<'_>, _issues: &[ValidationIssue]) {}
1498}
1499
1500pub struct NodeEvent<'a> {
1501    pub context: &'a FlowContext<'a>,
1502    pub node_id: &'a str,
1503    pub node: &'a HostNode,
1504    pub payload: &'a Value,
1505}
1506
1507#[derive(Clone, Debug, Serialize, Deserialize)]
1508pub struct ExecutionState {
1509    #[serde(default)]
1510    entry: Value,
1511    #[serde(default)]
1512    input: Value,
1513    #[serde(default)]
1514    nodes: HashMap<String, NodeOutput>,
1515    #[serde(default)]
1516    egress: Vec<Value>,
1517    #[serde(default, skip_serializing_if = "Option::is_none")]
1518    last_output: Option<Value>,
1519    #[serde(default)]
1520    redirect_count: u32,
1521}
1522
1523impl ExecutionState {
1524    fn new(input: Value) -> Self {
1525        Self {
1526            entry: input.clone(),
1527            input,
1528            nodes: HashMap::new(),
1529            egress: Vec::new(),
1530            last_output: None,
1531            redirect_count: 0,
1532        }
1533    }
1534
1535    /// Refresh `entry` from `input` if the snapshot was loaded without an
1536    /// entry value. Kept for backwards compatibility with snapshots
1537    /// persisted before the entry-refresh fix in `FlowEngine::resume`.
1538    #[allow(dead_code)]
1539    fn ensure_entry(&mut self) {
1540        if self.entry.is_null() {
1541            self.entry = self.input.clone();
1542        }
1543    }
1544
1545    fn context(&self) -> Value {
1546        let mut nodes = JsonMap::new();
1547        for (id, output) in &self.nodes {
1548            nodes.insert(
1549                id.clone(),
1550                json!({
1551                    "ok": output.ok,
1552                    "payload": output.payload.clone(),
1553                    "meta": output.meta.clone(),
1554                }),
1555            );
1556        }
1557        json!({
1558            "entry": self.entry.clone(),
1559            "input": self.input.clone(),
1560            "nodes": nodes,
1561            "redirect_count": self.redirect_count,
1562        })
1563    }
1564
1565    fn outputs_map(&self) -> JsonMap<String, Value> {
1566        let mut outputs = JsonMap::new();
1567        for (id, output) in &self.nodes {
1568            outputs.insert(id.clone(), output.payload.clone());
1569        }
1570        outputs
1571    }
1572    fn push_egress(&mut self, payload: Value) {
1573        self.egress.push(payload);
1574    }
1575
1576    fn replace_input(&mut self, input: Value) {
1577        self.input = input;
1578    }
1579
1580    fn clear_egress(&mut self) {
1581        self.egress.clear();
1582    }
1583
1584    fn redirect_count(&self) -> u32 {
1585        self.redirect_count
1586    }
1587
1588    fn increment_redirect_count(&mut self) {
1589        self.redirect_count = self.redirect_count.saturating_add(1);
1590    }
1591
1592    fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
1593        if self.egress.is_empty() {
1594            return final_payload.unwrap_or(Value::Null);
1595        }
1596        let mut emitted = std::mem::take(&mut self.egress);
1597        if let Some(value) = final_payload {
1598            match value {
1599                Value::Null => {}
1600                Value::Array(items) => emitted.extend(items),
1601                other => emitted.push(other),
1602            }
1603        }
1604        Value::Array(emitted)
1605    }
1606}
1607
1608#[derive(Clone, Debug, Serialize, Deserialize)]
1609struct NodeOutput {
1610    ok: bool,
1611    payload: Value,
1612    meta: Value,
1613}
1614
1615impl NodeOutput {
1616    fn new(payload: Value) -> Self {
1617        Self {
1618            ok: true,
1619            payload,
1620            meta: Value::Null,
1621        }
1622    }
1623
1624    /// `ok=false` output stashing error context in `meta.error`. Currently
1625    /// only used by `lift_first_node_error_from_nodes` tests — kept around so
1626    /// drive_flow can resume populating it once we have a hook for it.
1627    #[allow(dead_code)]
1628    fn with_error(node_id: &str, err: &(dyn std::error::Error + 'static)) -> Self {
1629        Self {
1630            ok: false,
1631            payload: Value::Null,
1632            meta: json!({
1633                "error": {
1634                    "kind": "flow_node_failed",
1635                    "message": err.to_string(),
1636                    "node_id": node_id,
1637                }
1638            }),
1639        }
1640    }
1641}
1642
1643struct DispatchOutcome {
1644    output: NodeOutput,
1645    control: NodeControl,
1646}
1647
1648impl DispatchOutcome {
1649    fn complete(output: NodeOutput) -> Self {
1650        Self {
1651            output,
1652            control: NodeControl::Continue,
1653        }
1654    }
1655
1656    fn wait(output: NodeOutput, reason: Option<String>) -> Self {
1657        Self {
1658            output,
1659            control: NodeControl::Wait { reason },
1660        }
1661    }
1662
1663    fn with_control(output: NodeOutput, control: NodeControl) -> Self {
1664        Self { output, control }
1665    }
1666}
1667
1668#[derive(Clone, Debug)]
1669enum NodeControl {
1670    Continue,
1671    Wait {
1672        reason: Option<String>,
1673    },
1674    Jump(JumpControl),
1675    Respond {
1676        text: Option<String>,
1677        card_cbor: Option<Vec<u8>>,
1678        needs_user: Option<bool>,
1679    },
1680}
1681
1682#[derive(Clone, Debug)]
1683struct JumpControl {
1684    flow: String,
1685    node: Option<String>,
1686    payload: Value,
1687    hints: Value,
1688    max_redirects: Option<u32>,
1689    reason: Option<String>,
1690}
1691
1692#[derive(Clone, Debug)]
1693struct JumpTarget {
1694    flow_id: String,
1695    flow: HostFlow,
1696    node_id: NodeId,
1697}
1698
1699impl NodeOutput {
1700    fn with_meta(payload: Value, meta: Value) -> Self {
1701        Self {
1702            ok: true,
1703            payload,
1704            meta,
1705        }
1706    }
1707}
1708
1709fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
1710    ComponentExecCtx {
1711        tenant: ComponentTenantCtx {
1712            tenant: ctx.tenant.to_string(),
1713            team: None,
1714            user: ctx.provider_id.map(str::to_string),
1715            trace_id: None,
1716            i18n_id: None,
1717            correlation_id: ctx.session_id.map(str::to_string),
1718            deadline_unix_ms: None,
1719            attempt: ctx.attempt,
1720            idempotency_key: ctx.session_id.map(str::to_string),
1721        },
1722        i18n_id: None,
1723        flow_id: ctx.flow_id.to_string(),
1724        node_id: Some(node_id.to_string()),
1725    }
1726}
1727
1728fn component_error(value: &Value) -> Option<(String, String)> {
1729    let obj = value.as_object()?;
1730    let ok = obj.get("ok").and_then(Value::as_bool)?;
1731    if ok {
1732        return None;
1733    }
1734    let err = obj.get("error")?.as_object()?;
1735    let code = err
1736        .get("code")
1737        .and_then(Value::as_str)
1738        .unwrap_or("component_error");
1739    let message = err
1740        .get("message")
1741        .and_then(Value::as_str)
1742        .unwrap_or("component reported error");
1743    Some((code.to_string(), message.to_string()))
1744}
1745
1746/// MCP tool-error wire shape from greentic-mcp-generator's `tool_error_with_status`:
1747/// `{ "error": { "code", "message", "status" } }`. The component returned ok=true at
1748/// the WIT level (the HTTP failure was caught and serialized), so the regular
1749/// component_error path doesn't catch it.
1750fn mcp_tool_error(value: &Value) -> Option<(String, String)> {
1751    let obj = value.as_object()?;
1752    // Must be the error shape: no `result` field, just `error`.
1753    if obj.contains_key("result") {
1754        return None;
1755    }
1756    let err = obj.get("error")?.as_object()?;
1757    let code = err
1758        .get("code")
1759        .and_then(Value::as_str)
1760        .unwrap_or("tool_error");
1761    let raw_message = err
1762        .get("message")
1763        .and_then(Value::as_str)
1764        .unwrap_or("tool returned an error");
1765    let status = err.get("status").and_then(Value::as_u64);
1766    let message = match status {
1767        Some(s) => format!("{raw_message} (status {s})"),
1768        None => raw_message.to_string(),
1769    };
1770    Some((code.to_string(), message))
1771}
1772
1773fn extract_wait_reason(payload: &Value) -> Option<String> {
1774    match payload {
1775        Value::String(s) => Some(s.clone()),
1776        Value::Object(map) => map
1777            .get("reason")
1778            .and_then(Value::as_str)
1779            .map(|value| value.to_string()),
1780        _ => None,
1781    }
1782}
1783
1784fn component_dispatch_outcome(output: NodeOutput) -> Result<DispatchOutcome> {
1785    if let Some(control) = parse_component_control(&output.payload)? {
1786        return Ok(match control {
1787            NodeControl::Jump(jump) => {
1788                let adjusted = NodeOutput::with_meta(jump.payload.clone(), jump.hints.clone());
1789                DispatchOutcome::with_control(adjusted, NodeControl::Jump(jump))
1790            }
1791            NodeControl::Respond {
1792                text,
1793                card_cbor,
1794                needs_user,
1795            } => DispatchOutcome::with_control(
1796                output,
1797                NodeControl::Respond {
1798                    text,
1799                    card_cbor,
1800                    needs_user,
1801                },
1802            ),
1803            other => DispatchOutcome::with_control(output, other),
1804        });
1805    }
1806    Ok(DispatchOutcome::complete(output))
1807}
1808
1809fn parse_component_control(payload: &Value) -> Result<Option<NodeControl>> {
1810    let Value::Object(map) = payload else {
1811        return Ok(None);
1812    };
1813    let Some(control_value) = map.get("greentic_control") else {
1814        return Ok(None);
1815    };
1816    let control = control_value
1817        .as_object()
1818        .ok_or_else(|| anyhow!("jump_failed: greentic_control must be an object"))?;
1819    let action = control
1820        .get("action")
1821        .and_then(Value::as_str)
1822        .ok_or_else(|| anyhow!("jump_failed: greentic_control.action is required"))?;
1823    let version = control
1824        .get("v")
1825        .and_then(Value::as_u64)
1826        .ok_or_else(|| anyhow!("jump_failed: greentic_control.v is required"))?;
1827    if version != 1 {
1828        bail!("jump_failed: unsupported greentic_control.v={version}");
1829    }
1830
1831    match action {
1832        "jump" => {
1833            let flow = control
1834                .get("flow")
1835                .and_then(Value::as_str)
1836                .map(str::trim)
1837                .filter(|value| !value.is_empty())
1838                .ok_or_else(|| anyhow!("jump_failed: jump flow is required"))?
1839                .to_string();
1840            let node = control
1841                .get("node")
1842                .and_then(Value::as_str)
1843                .map(str::trim)
1844                .filter(|value| !value.is_empty())
1845                .map(str::to_string);
1846            let payload = control.get("payload").cloned().unwrap_or(Value::Null);
1847            let hints = control.get("hints").cloned().unwrap_or(Value::Null);
1848            let max_redirects = control
1849                .get("max_redirects")
1850                .and_then(Value::as_u64)
1851                .and_then(|value| u32::try_from(value).ok());
1852            let reason = control
1853                .get("reason")
1854                .and_then(Value::as_str)
1855                .map(str::to_string);
1856            Ok(Some(NodeControl::Jump(JumpControl {
1857                flow,
1858                node,
1859                payload,
1860                hints,
1861                max_redirects,
1862                reason,
1863            })))
1864        }
1865        "respond" => {
1866            let text = control
1867                .get("text")
1868                .and_then(Value::as_str)
1869                .map(str::to_string);
1870            let card_cbor = control
1871                .get("card_cbor")
1872                .and_then(Value::as_array)
1873                .map(|bytes| {
1874                    bytes
1875                        .iter()
1876                        .filter_map(Value::as_u64)
1877                        .filter_map(|value| u8::try_from(value).ok())
1878                        .collect::<Vec<_>>()
1879                });
1880            let needs_user = control.get("needs_user").and_then(Value::as_bool);
1881            Ok(Some(NodeControl::Respond {
1882                text,
1883                card_cbor,
1884                needs_user,
1885            }))
1886        }
1887        _ => Ok(None),
1888    }
1889}
1890
1891fn template_context(state: &ExecutionState, prev: Value) -> Value {
1892    let entry = if state.entry.is_null() {
1893        Value::Object(JsonMap::new())
1894    } else {
1895        state.entry.clone()
1896    };
1897    let mut ctx = JsonMap::new();
1898    ctx.insert("entry".into(), entry.clone());
1899    ctx.insert("in".into(), entry); // alias for entry - used in flow templates
1900    ctx.insert("prev".into(), prev);
1901    ctx.insert("node".into(), Value::Object(state.outputs_map()));
1902    ctx.insert("state".into(), state.context());
1903    Value::Object(ctx)
1904}
1905
1906impl From<Flow> for HostFlow {
1907    fn from(value: Flow) -> Self {
1908        let mut nodes = IndexMap::new();
1909        for (id, node) in value.nodes {
1910            nodes.insert(id.clone(), HostNode::from(node));
1911        }
1912        let start = value
1913            .entrypoints
1914            .get("default")
1915            .and_then(Value::as_str)
1916            .and_then(|id| NodeId::from_str(id).ok())
1917            .or_else(|| nodes.keys().next().cloned());
1918        // Extract flow-level slot_schema from metadata.extra (Phase D).
1919        // The producer side (greentic-flow compile_flow) stores it under
1920        // "greentic.slot_schema" when the FlowDoc has a `slot_schema` field.
1921        let slot_schema = value
1922            .metadata
1923            .extra
1924            .get(SLOT_SCHEMA_METADATA_KEY)
1925            .filter(|v| !v.is_null())
1926            .cloned();
1927        Self {
1928            id: value.id.as_str().to_string(),
1929            start,
1930            nodes,
1931            slot_schema,
1932        }
1933    }
1934}
1935
1936impl From<Node> for HostNode {
1937    fn from(node: Node) -> Self {
1938        let full_ref = node.component.id.as_str().to_string();
1939        // When the pack compiler stores "component.operation" as a single ID
1940        // without a separate operation field, split on the last dot.
1941        let is_builtin = full_ref.starts_with("component.exec")
1942            || full_ref.starts_with("flow.")
1943            || full_ref.starts_with("emit.")
1944            || full_ref.starts_with("session.")
1945            || full_ref.starts_with("provider.");
1946        let (component_ref, raw_operation) = if node.component.operation.is_some() || is_builtin {
1947            (full_ref, node.component.operation.clone())
1948        } else if let Some(dot) = full_ref.rfind('.') {
1949            let comp = full_ref[..dot].to_string();
1950            let op = full_ref[dot + 1..].to_string();
1951            (comp, Some(op))
1952        } else {
1953            (full_ref, None)
1954        };
1955        let operation_in_mapping = extract_operation_from_mapping(&node.input.mapping);
1956        let operation_is_component_exec = raw_operation.as_deref() == Some("component.exec");
1957        let operation_is_emit = raw_operation
1958            .as_deref()
1959            .map(|op| op.starts_with("emit."))
1960            .unwrap_or(false);
1961        let is_component_exec = component_ref == "component.exec" || operation_is_component_exec;
1962
1963        let kind = if is_component_exec {
1964            let target = if component_ref == "component.exec" {
1965                if let Some(op) = raw_operation
1966                    .as_deref()
1967                    .filter(|op| op.starts_with("emit."))
1968                {
1969                    op.to_string()
1970                } else {
1971                    extract_target_component(&node.input.mapping)
1972                        .unwrap_or_else(|| "component.exec".to_string())
1973                }
1974            } else {
1975                extract_target_component(&node.input.mapping)
1976                    .unwrap_or_else(|| component_ref.clone())
1977            };
1978            if target.starts_with("emit.") {
1979                NodeKind::BuiltinEmit {
1980                    kind: emit_kind_from_ref(&target),
1981                }
1982            } else {
1983                NodeKind::Exec {
1984                    target_component: target,
1985                }
1986            }
1987        } else if operation_is_emit {
1988            NodeKind::BuiltinEmit {
1989                kind: emit_kind_from_ref(raw_operation.as_deref().unwrap_or("emit.log")),
1990            }
1991        } else {
1992            match component_ref.as_str() {
1993                "flow.call" => NodeKind::FlowCall,
1994                "provider.invoke" => NodeKind::ProviderInvoke,
1995                "session.wait" => NodeKind::Wait,
1996                "state.get" => NodeKind::BuiltinStateGet,
1997                "state.set" => NodeKind::BuiltinStateSet,
1998                comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
1999                    kind: emit_kind_from_ref(comp),
2000                },
2001                other => NodeKind::PackComponent {
2002                    component_ref: other.to_string(),
2003                },
2004            }
2005        };
2006        let component_label = match &kind {
2007            NodeKind::Exec { .. } => "component.exec".to_string(),
2008            NodeKind::PackComponent { component_ref } => component_ref.clone(),
2009            NodeKind::ProviderInvoke => "provider.invoke".to_string(),
2010            NodeKind::FlowCall => "flow.call".to_string(),
2011            NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
2012            NodeKind::BuiltinStateGet => "state.get".to_string(),
2013            NodeKind::BuiltinStateSet => "state.set".to_string(),
2014            NodeKind::Wait => "session.wait".to_string(),
2015        };
2016        let operation_name = if is_component_exec && operation_is_component_exec {
2017            None
2018        } else {
2019            raw_operation.clone()
2020        };
2021        let payload_expr = match kind {
2022            NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.input.mapping),
2023            _ => node.input.mapping.clone(),
2024        };
2025        Self {
2026            kind,
2027            component: component_label,
2028            component_id: if is_component_exec {
2029                "component.exec".to_string()
2030            } else {
2031                component_ref
2032            },
2033            operation_name,
2034            operation_in_mapping,
2035            payload_expr,
2036            routing: node.routing,
2037        }
2038    }
2039}
2040
2041fn extract_target_component(payload: &Value) -> Option<String> {
2042    match payload {
2043        Value::Object(map) => map
2044            .get("component")
2045            .or_else(|| map.get("component_ref"))
2046            .and_then(Value::as_str)
2047            .map(|s| s.to_string()),
2048        _ => None,
2049    }
2050}
2051
2052fn extract_operation_from_mapping(payload: &Value) -> Option<String> {
2053    match payload {
2054        Value::Object(map) => map
2055            .get("operation")
2056            .or_else(|| map.get("op"))
2057            .and_then(Value::as_str)
2058            .map(str::trim)
2059            .filter(|value| !value.is_empty())
2060            .map(|value| value.to_string()),
2061        _ => None,
2062    }
2063}
2064
2065fn extract_emit_payload(payload: &Value) -> Value {
2066    if let Value::Object(map) = payload {
2067        if let Some(input) = map.get("input") {
2068            return input.clone();
2069        }
2070        if let Some(inner) = map.get("payload") {
2071            return inner.clone();
2072        }
2073    }
2074    payload.clone()
2075}
2076
2077fn split_operation_payload(payload: Value) -> (Value, Value) {
2078    if let Value::Object(mut map) = payload.clone()
2079        && map.contains_key("input")
2080    {
2081        let input = map.remove("input").unwrap_or(Value::Null);
2082        let config = map.remove("config").unwrap_or(Value::Null);
2083        let legacy_only = map.keys().all(|key| {
2084            matches!(
2085                key.as_str(),
2086                "operation" | "op" | "component" | "component_ref"
2087            )
2088        });
2089        if legacy_only {
2090            return (input, config);
2091        }
2092    }
2093    (payload, Value::Null)
2094}
2095
2096fn resolve_component_operation(
2097    node_id: &str,
2098    component_label: &str,
2099    payload_operation: Option<String>,
2100    operation_override: Option<&str>,
2101    operation_in_mapping: Option<&str>,
2102) -> Result<String> {
2103    if let Some(op) = operation_override
2104        .map(str::trim)
2105        .filter(|value| !value.is_empty())
2106    {
2107        return Ok(op.to_string());
2108    }
2109
2110    if let Some(op) = payload_operation
2111        .as_deref()
2112        .map(str::trim)
2113        .filter(|value| !value.is_empty())
2114    {
2115        return Ok(op.to_string());
2116    }
2117
2118    let mut message = format!(
2119        "missing operation for node `{}` (component `{}`); expected node.component.operation to be set",
2120        node_id, component_label,
2121    );
2122    if let Some(found) = operation_in_mapping {
2123        message.push_str(&format!(
2124            ". Found operation in input.mapping (`{}`) but this is not used; pack compiler must preserve node.component.operation.",
2125            found
2126        ));
2127    }
2128    bail!(message);
2129}
2130
2131fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
2132    match component_ref {
2133        "emit.log" => EmitKind::Log,
2134        "emit.response" => EmitKind::Response,
2135        other => EmitKind::Other(other.to_string()),
2136    }
2137}
2138
2139fn emit_ref_from_kind(kind: &EmitKind) -> String {
2140    match kind {
2141        EmitKind::Log => "emit.log".to_string(),
2142        EmitKind::Response => "emit.response".to_string(),
2143        EmitKind::Other(other) => other.clone(),
2144    }
2145}
2146
2147/// Returns `true` when `input` looks like an Adaptive Card invocation
2148/// (contains `card_source` or `card_spec` at the top level).
2149fn is_card_invocation(input: &Value) -> bool {
2150    if let Value::Object(map) = input {
2151        return map.contains_key("card_source") || map.contains_key("card_spec");
2152    }
2153    false
2154}
2155
2156/// When the node config declares adaptive-card defaults (`default_card_asset`,
2157/// `default_card_inline`, or `default_source`) but the runtime invocation has
2158/// no `card_source`/`card_spec` yet, lift those defaults into the invocation.
2159/// This produces a schema-valid invocation envelope so the component does not
2160/// fall back to its generic "Welcome" placeholder.
2161///
2162/// Adaptive-card defaults can arrive in either of two places depending on how
2163/// the pack was compiled:
2164/// - top-level `call.config` (post `split_operation_payload`)
2165/// - nested `call.input.config` (when the node mapping kept the
2166///   `{component, config}` shape and `split_operation_payload` left it intact)
2167fn promote_card_config_to_invocation(input: &mut Value, config: &Value) {
2168    if is_card_invocation(input) {
2169        return;
2170    }
2171
2172    let cfg_map = card_defaults_source(input, config);
2173    let Some(cfg) = cfg_map else { return };
2174
2175    let default_asset = cfg
2176        .get("default_card_asset")
2177        .and_then(Value::as_str)
2178        .map(str::trim)
2179        .filter(|value| !value.is_empty())
2180        .map(str::to_string);
2181    let default_inline = cfg
2182        .get("default_card_inline")
2183        .filter(|value| value.is_object() || value.is_array())
2184        .cloned();
2185    let default_source = cfg
2186        .get("default_source")
2187        .and_then(Value::as_str)
2188        .map(str::trim)
2189        .filter(|value| !value.is_empty())
2190        .map(str::to_lowercase);
2191
2192    if default_asset.is_none() && default_inline.is_none() && default_source.is_none() {
2193        return;
2194    }
2195
2196    let card_source = default_source.unwrap_or_else(|| {
2197        if default_inline.is_some() {
2198            "inline".to_string()
2199        } else {
2200            "asset".to_string()
2201        }
2202    });
2203
2204    let mut card_spec = serde_json::Map::new();
2205    match card_source.as_str() {
2206        "asset" => {
2207            if let Some(path) = default_asset {
2208                card_spec.insert("asset_path".into(), Value::String(path));
2209            }
2210        }
2211        "inline" => {
2212            if let Some(inline) = default_inline {
2213                card_spec.insert("inline_json".into(), inline);
2214            }
2215        }
2216        _ => {}
2217    }
2218
2219    if !matches!(input, Value::Object(_)) {
2220        *input = Value::Object(serde_json::Map::new());
2221    }
2222    if let Value::Object(map) = input {
2223        map.insert("card_source".into(), Value::String(card_source));
2224        map.insert("card_spec".into(), Value::Object(card_spec));
2225    }
2226}
2227
2228/// Locate the adaptive-card defaults config object, preferring the top-level
2229/// `call.config` when present, then falling back to a nested `input.config`
2230/// (the shape produced when `split_operation_payload` leaves the mapping
2231/// intact).
2232fn card_defaults_source<'a>(
2233    input: &'a Value,
2234    config: &'a Value,
2235) -> Option<&'a serde_json::Map<String, Value>> {
2236    if let Value::Object(map) = config {
2237        return Some(map);
2238    }
2239    if let Value::Object(map) = input
2240        && let Some(Value::Object(nested)) = map.get("config")
2241    {
2242        return Some(nested);
2243    }
2244    None
2245}
2246
2247fn inject_card_locale(payload: &mut Value, entry: &Value) {
2248    if !is_card_invocation(payload) {
2249        return;
2250    }
2251    let Value::Object(map) = payload else { return };
2252    if map.contains_key("locale") {
2253        return;
2254    }
2255    let locale = entry
2256        .pointer("/input/metadata/locale")
2257        .or_else(|| entry.pointer("/metadata/locale"))
2258        .and_then(Value::as_str);
2259    if let Some(locale) = locale {
2260        map.insert("locale".into(), Value::String(locale.to_string()));
2261    }
2262}
2263
2264/// Inject flow-level `slot_schema` as `slot_definitions` into the
2265/// slot-extractor component's input value. Skips injection when the input
2266/// already contains an explicit `slot_definitions` key (back-compat with
2267/// M2.4 NDA demo inline definitions). When the input is `Null`, promotes it
2268/// to an empty object first.
2269fn inject_slot_definitions(input: &mut Value, slot_schema: &Value, flow_id: &str, node_id: &str) {
2270    if input.is_null() {
2271        *input = Value::Object(serde_json::Map::new());
2272    }
2273    let Some(map) = input.as_object_mut() else {
2274        tracing::warn!(
2275            flow_id,
2276            node_id,
2277            "slot-extractor input is not an object; cannot inject slot_definitions"
2278        );
2279        return;
2280    };
2281    if map.contains_key("slot_definitions") {
2282        return;
2283    }
2284    let slot_count = slot_schema.as_array().map_or(0, Vec::len);
2285    tracing::debug!(
2286        flow_id,
2287        slot_count,
2288        "injecting flow-level slot_schema as slot_definitions into slot-extractor input"
2289    );
2290    map.insert("slot_definitions".to_string(), slot_schema.clone());
2291}
2292
2293/// Pre-resolve `card_source: "asset"` entries by reading the referenced JSON
2294/// file from the pack's assets directory and converting to
2295/// `card_source: "inline"` with `inline_json` populated.
2296///
2297/// This handles both top-level card fields and the nested `call.payload`
2298/// structure emitted by cards2pack.
2299fn resolve_card_assets(input: &mut Value, pack: &crate::pack::PackRuntime) {
2300    resolve_card_spec_asset(input, pack);
2301
2302    // Also resolve inside `call.payload` (cards2pack duplicates the card
2303    // invocation there).
2304    if let Value::Object(map) = input
2305        && let Some(Value::Object(call)) = map.get_mut("call")
2306        && let Some(payload) = call.get_mut("payload")
2307    {
2308        resolve_card_spec_asset(payload, pack);
2309    }
2310}
2311
2312/// Resolve a single card_spec asset_path → inline_json.
2313fn resolve_card_spec_asset(value: &mut Value, pack: &crate::pack::PackRuntime) {
2314    let Value::Object(map) = value else { return };
2315
2316    let is_asset = map
2317        .get("card_source")
2318        .and_then(Value::as_str)
2319        .map(|s| s.eq_ignore_ascii_case("asset"))
2320        .unwrap_or(false);
2321    if !is_asset {
2322        return;
2323    }
2324
2325    let asset_path = map
2326        .get("card_spec")
2327        .and_then(|spec| spec.get("asset_path"))
2328        .and_then(Value::as_str)
2329        .map(str::to_string);
2330
2331    let Some(asset_path) = asset_path else { return };
2332
2333    match pack.read_asset(&asset_path) {
2334        Ok(bytes) => {
2335            let card_json: Value = match serde_json::from_slice(&bytes) {
2336                Ok(v) => v,
2337                Err(err) => {
2338                    tracing::warn!(
2339                        asset_path,
2340                        %err,
2341                        "failed to parse card asset as JSON; leaving as asset reference"
2342                    );
2343                    return;
2344                }
2345            };
2346            tracing::debug!(asset_path, "pre-resolved card asset to inline_json");
2347            map.insert("card_source".into(), Value::String("inline".into()));
2348            if let Some(Value::Object(spec)) = map.get_mut("card_spec") {
2349                spec.insert("inline_json".into(), card_json);
2350                spec.remove("asset_path");
2351            }
2352        }
2353        Err(err) => {
2354            tracing::warn!(
2355                asset_path,
2356                %err,
2357                "card asset not found in pack; leaving as asset reference"
2358            );
2359        }
2360    }
2361
2362    // Pre-resolve i18n bundle: the WASM component cannot read pack assets
2363    // directly (no host resolver registered), so inline the i18n JSON into
2364    // the invocation under `card_spec.i18n_inline`. Defense-in-depth: when
2365    // the card omits an explicit `i18n_bundle_path` we still try the
2366    // conventional `assets/i18n/` location so cards that rely on
2367    // auto-generated i18n keys (e.g. cards2pack output) keep working.
2368    let configured_bundle_path = map
2369        .get("card_spec")
2370        .and_then(|spec| spec.get("i18n_bundle_path"))
2371        .and_then(Value::as_str)
2372        .map(|s| s.trim().trim_end_matches('/').to_string())
2373        .filter(|s| !s.is_empty());
2374
2375    let bundle_path = configured_bundle_path
2376        .clone()
2377        .unwrap_or_else(|| "assets/i18n".to_string());
2378
2379    let i18n_entries = load_i18n_bundle_entries(&bundle_path, |path| pack.read_asset(path));
2380
2381    if !i18n_entries.is_empty() {
2382        let locale_keys: Vec<_> = i18n_entries.keys().cloned().collect();
2383        if let Some(Value::Object(spec)) = map.get_mut("card_spec") {
2384            spec.insert("i18n_inline".into(), Value::Object(i18n_entries));
2385            if configured_bundle_path.is_some() {
2386                tracing::info!(%bundle_path, ?locale_keys, "pre-resolved i18n bundle into card_spec.i18n_inline");
2387            } else {
2388                tracing::info!(%bundle_path, ?locale_keys, "auto-discovered i18n bundle and inlined into card_spec.i18n_inline");
2389            }
2390        }
2391    }
2392}
2393
2394fn load_i18n_bundle_entries<F>(bundle_path: &str, mut read_asset: F) -> JsonMap<String, Value>
2395where
2396    F: FnMut(&str) -> Result<Vec<u8>>,
2397{
2398    let mut i18n_entries = JsonMap::new();
2399
2400    if bundle_path.ends_with(".json") {
2401        if let Ok(bytes) = read_asset(bundle_path)
2402            && let Ok(Value::Object(entries)) = serde_json::from_slice::<Value>(&bytes)
2403        {
2404            i18n_entries.insert("en".to_string(), Value::Object(entries));
2405        }
2406        return i18n_entries;
2407    }
2408
2409    let manifest_path = format!("{bundle_path}/_manifest.json");
2410    let locale_codes: Vec<String> = read_asset(&manifest_path)
2411        .ok()
2412        .and_then(|bytes| serde_json::from_slice::<Value>(&bytes).ok())
2413        .and_then(|value| {
2414            let locales = value
2415                .get("locales")
2416                .and_then(Value::as_array)
2417                .cloned()
2418                .or_else(|| value.as_array().cloned());
2419            locales.map(|items| {
2420                items
2421                    .iter()
2422                    .filter_map(Value::as_str)
2423                    .map(String::from)
2424                    .collect()
2425            })
2426        })
2427        .unwrap_or_default();
2428
2429    tracing::info!(%bundle_path, ?locale_codes, "i18n manifest discovered locales");
2430
2431    for locale in &locale_codes {
2432        let candidate = format!("{bundle_path}/{locale}.json");
2433        if let Ok(bytes) = read_asset(&candidate)
2434            && let Ok(Value::Object(entries)) = serde_json::from_slice::<Value>(&bytes)
2435        {
2436            i18n_entries.insert(locale.clone(), Value::Object(entries));
2437        }
2438    }
2439    if !i18n_entries.contains_key("en") {
2440        let en_path = format!("{bundle_path}/en.json");
2441        if let Ok(bytes) = read_asset(&en_path)
2442            && let Ok(Value::Object(entries)) = serde_json::from_slice::<Value>(&bytes)
2443        {
2444            i18n_entries.insert("en".to_string(), Value::Object(entries));
2445        }
2446    }
2447
2448    i18n_entries
2449}
2450
2451/// Outcome of `evaluate_custom_routing` for a node's `Routing::Custom` array.
2452///
2453/// `Next` advances the flow to the named target. `End` terminates the run.
2454/// `Wait` pauses the run at the current node so the next inbound activity
2455/// resumes here and re-evaluates the routing with the new context — this is
2456/// what allows messaging flows (welcome → ... → confirm) to behave like a
2457/// live conversation instead of restarting at the entry point on every
2458/// click.
2459#[derive(Debug)]
2460pub(crate) enum CustomRoutingDecision {
2461    Next(NodeId),
2462    End,
2463    Wait,
2464}
2465
2466/// Evaluate a node's `Routing::Custom` array against the current execution
2467/// context.
2468///
2469/// Parses `Routing::Custom(Value)` as an array of `{condition, to}` objects.
2470/// Conditions are simple equality expressions like `response.action == "about"`.
2471/// Falls back to the first route without a condition (default route).
2472///
2473/// The evaluation context includes:
2474/// - All fields from the node output payload (top-level)
2475/// - `entry` / `in` — the original flow entry (incoming message)
2476/// - `response` — synthesized from entry metadata for convenient condition checks
2477///   (e.g. `response.action` maps to `metadata.action` from the incoming envelope)
2478fn evaluate_custom_routing(
2479    raw: &Value,
2480    output: &NodeOutput,
2481    state: &ExecutionState,
2482    flow_ir: &HostFlow,
2483    node_id: &NodeId,
2484) -> CustomRoutingDecision {
2485    let routes = match raw.as_array() {
2486        Some(arr) => arr,
2487        None => {
2488            tracing::warn!(
2489                flow_id = %flow_ir.id,
2490                node_id = %node_id,
2491                "custom routing is not an array; terminating"
2492            );
2493            return CustomRoutingDecision::End;
2494        }
2495    };
2496
2497    // Build a rich context for condition evaluation:
2498    // Start with output payload, then overlay entry and synthesised "response".
2499    let ctx = build_routing_context(output, state);
2500
2501    let mut has_condition = false;
2502    for route in routes {
2503        let condition = route.get("condition").and_then(|v| v.as_str());
2504        let to = route.get("to").and_then(|v| v.as_str());
2505
2506        if let Some(cond) = condition {
2507            has_condition = true;
2508            if evaluate_simple_condition(cond, &ctx)
2509                && let Some(target) = to
2510                && let Ok(nid) = NodeId::new(target)
2511            {
2512                tracing::debug!(
2513                    flow_id = %flow_ir.id,
2514                    node_id = %node_id,
2515                    condition = cond,
2516                    target = target,
2517                    "conditional route matched"
2518                );
2519                return CustomRoutingDecision::Next(nid);
2520            }
2521        } else if let Some(target) = to
2522            && let Ok(nid) = NodeId::new(target)
2523        {
2524            tracing::debug!(
2525                flow_id = %flow_ir.id,
2526                node_id = %node_id,
2527                target = target,
2528                "default route taken"
2529            );
2530            return CustomRoutingDecision::Next(nid);
2531        }
2532    }
2533
2534    // Fall-through. When the routing array contained at least one
2535    // conditional entry, treat the unmatched fall-through as a pause: the
2536    // user's next submission should be re-evaluated against this same
2537    // node's routing rather than restarting the flow from the entry point.
2538    // Routing arrays with no conditions at all (pure unconditional `out`
2539    // terminators) remain true ends.
2540    if has_condition {
2541        tracing::debug!(
2542            flow_id = %flow_ir.id,
2543            node_id = %node_id,
2544            "no conditional route matched; pausing run at current node for resume"
2545        );
2546        CustomRoutingDecision::Wait
2547    } else {
2548        tracing::warn!(
2549            flow_id = %flow_ir.id,
2550            node_id = %node_id,
2551            "no route matched and no conditions present; terminating"
2552        );
2553        CustomRoutingDecision::End
2554    }
2555}
2556
2557/// Evaluate a simple condition expression like `response.action == "about"`.
2558///
2559/// Supports dotted path lookups against a JSON value context.
2560/// Format: `<path> == "<value>"` or `<path> != "<value>"`
2561fn evaluate_simple_condition(condition: &str, ctx: &Value) -> bool {
2562    // Parse: `path == "value"` or `path != "value"`
2563    let (path, expected, negate) = if let Some(idx) = condition.find("==") {
2564        let path = condition[..idx].trim();
2565        let val = condition[idx + 2..].trim().trim_matches('"');
2566        (path, val, false)
2567    } else if let Some(idx) = condition.find("!=") {
2568        let path = condition[..idx].trim();
2569        let val = condition[idx + 2..].trim().trim_matches('"');
2570        (path, val, true)
2571    } else {
2572        return false;
2573    };
2574
2575    // Resolve dotted path against context (case-insensitive comparison)
2576    let actual = resolve_dotted_path(ctx, path);
2577    let matches = actual
2578        .as_deref()
2579        .is_some_and(|a| a.eq_ignore_ascii_case(expected));
2580    if negate { !matches } else { matches }
2581}
2582
2583/// Resolve a dotted path like `response.action` against a JSON value.
2584fn resolve_dotted_path(value: &Value, path: &str) -> Option<String> {
2585    let parts: Vec<&str> = path.split('.').collect();
2586    let mut current = value;
2587    for part in &parts {
2588        current = current.get(part)?;
2589    }
2590    match current {
2591        Value::String(s) => Some(s.clone()),
2592        Value::Bool(b) => Some(b.to_string()),
2593        Value::Number(n) => Some(n.to_string()),
2594        _ => Some(current.to_string()),
2595    }
2596}
2597
2598/// Build a context object for routing condition evaluation.
2599///
2600/// The context merges the node output with the flow entry so that conditions
2601/// can reference both component results and incoming message data.
2602///
2603/// Layout:
2604/// ```text
2605/// {
2606///   ...output.payload...,     // top-level fields from component output
2607///   "entry": <flow entry>,
2608///   "in":    <flow entry>,    // alias
2609///   "response": {             // synthesised from envelope metadata
2610///     <key>: <value>,         // e.g. "action": "about"
2611///     ...
2612///   }
2613/// }
2614/// ```
2615fn build_routing_context(output: &NodeOutput, state: &ExecutionState) -> Value {
2616    let mut ctx = match &output.payload {
2617        Value::Object(map) => map.clone(),
2618        _ => JsonMap::new(),
2619    };
2620
2621    let entry = &state.entry;
2622    ctx.insert("entry".into(), entry.clone());
2623    ctx.insert("in".into(), entry.clone());
2624
2625    // Synthesise "response" from the envelope metadata.
2626    // greentic-start demo path: entry.input.metadata.*
2627    // greentic-runner direct path: entry.metadata.*
2628    let metadata = entry
2629        .pointer("/input/metadata")
2630        .or_else(|| entry.pointer("/metadata"));
2631
2632    let mut response = JsonMap::new();
2633    if let Some(Value::Object(meta)) = metadata {
2634        for (k, v) in meta {
2635            // Flatten string values; stringify others
2636            match v {
2637                Value::String(s) => {
2638                    response.insert(k.clone(), Value::String(s.clone()));
2639                }
2640                other => {
2641                    response.insert(k.clone(), other.clone());
2642                }
2643            }
2644        }
2645    }
2646    // Also pull text from the envelope for convenience
2647    if let Some(text) = entry
2648        .pointer("/input/text")
2649        .or_else(|| entry.pointer("/text"))
2650        .filter(|t| !t.is_null())
2651    {
2652        response.insert("text".into(), text.clone());
2653    }
2654    ctx.insert("response".into(), Value::Object(response));
2655
2656    Value::Object(ctx)
2657}
2658
2659#[cfg(test)]
2660mod tests {
2661    use super::*;
2662    use crate::validate::{ValidationConfig, ValidationMode};
2663    use greentic_types::{
2664        Flow, FlowComponentRef, FlowId, FlowKind, InputMapping, Node, NodeId, OutputMapping,
2665        Routing, TelemetryHints,
2666    };
2667    use serde_json::json;
2668    use std::collections::{BTreeMap, HashMap as StdHashMap};
2669    use std::str::FromStr;
2670    use std::sync::Mutex;
2671    use tokio::runtime::Runtime;
2672
2673    fn minimal_engine() -> FlowEngine {
2674        FlowEngine {
2675            packs: Vec::new(),
2676            flows: Vec::new(),
2677            flow_sources: HashMap::new(),
2678            flow_cache: RwLock::new(HashMap::new()),
2679            default_env: "local".to_string(),
2680            validation: ValidationConfig {
2681                mode: ValidationMode::Off,
2682            },
2683            cross_pack_resolver: None,
2684            rollout_ids: RolloutIds::default(),
2685        }
2686    }
2687
2688    #[test]
2689    fn templating_renders_with_partials_and_data() {
2690        let mut state = ExecutionState::new(json!({ "city": "London" }));
2691        state.nodes.insert(
2692            "forecast".to_string(),
2693            NodeOutput::new(json!({ "temp": "20C" })),
2694        );
2695
2696        // templating context includes node outputs for runner-side payload rendering.
2697        let ctx = state.context();
2698        assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
2699    }
2700
2701    #[test]
2702    fn finalize_wraps_emitted_payloads() {
2703        let mut state = ExecutionState::new(json!({}));
2704        state.push_egress(json!({ "text": "first" }));
2705        state.push_egress(json!({ "text": "second" }));
2706        let result = state.finalize_with(Some(json!({ "text": "final" })));
2707        assert_eq!(
2708            result,
2709            json!([
2710                { "text": "first" },
2711                { "text": "second" },
2712                { "text": "final" }
2713            ])
2714        );
2715    }
2716
2717    #[test]
2718    fn finalize_flattens_final_array() {
2719        let mut state = ExecutionState::new(json!({}));
2720        state.push_egress(json!({ "text": "only" }));
2721        let result = state.finalize_with(Some(json!([
2722            { "text": "extra-1" },
2723            { "text": "extra-2" }
2724        ])));
2725        assert_eq!(
2726            result,
2727            json!([
2728                { "text": "only" },
2729                { "text": "extra-1" },
2730                { "text": "extra-2" }
2731            ])
2732        );
2733    }
2734
2735    #[test]
2736    fn inject_card_locale_uses_entry_metadata_without_overwriting_payload() {
2737        let mut payload = json!({
2738            "card_source": "inline",
2739            "card_spec": { "title": "Hello" }
2740        });
2741        inject_card_locale(
2742            &mut payload,
2743            &json!({"input": {"metadata": {"locale": "nl-NL"}}}),
2744        );
2745        assert_eq!(payload["locale"], json!("nl-NL"));
2746
2747        let mut existing = json!({
2748            "card_source": "inline",
2749            "card_spec": { "title": "Hello" },
2750            "locale": "en-GB"
2751        });
2752        inject_card_locale(&mut existing, &json!({"metadata": {"locale": "nl-NL"}}));
2753        assert_eq!(existing["locale"], json!("en-GB"));
2754    }
2755
2756    #[test]
2757    fn load_i18n_bundle_entries_reads_manifest_and_falls_back_to_en() {
2758        let assets = StdHashMap::from([
2759            (
2760                "cards/i18n/_manifest.json".to_string(),
2761                br#"{"locales":["de"]}"#.to_vec(),
2762            ),
2763            (
2764                "cards/i18n/de.json".to_string(),
2765                br#"{"title":"Hallo"}"#.to_vec(),
2766            ),
2767            (
2768                "cards/i18n/en.json".to_string(),
2769                br#"{"title":"Hello"}"#.to_vec(),
2770            ),
2771        ]);
2772
2773        let entries = load_i18n_bundle_entries("cards/i18n", |path| {
2774            assets
2775                .get(path)
2776                .cloned()
2777                .with_context(|| format!("missing asset {path}"))
2778        });
2779
2780        assert_eq!(entries["de"]["title"], json!("Hallo"));
2781        assert_eq!(entries["en"]["title"], json!("Hello"));
2782    }
2783
2784    #[test]
2785    fn load_i18n_bundle_entries_reads_single_file_bundle() {
2786        let entries = load_i18n_bundle_entries("cards/i18n.json", |path| {
2787            if path == "cards/i18n.json" {
2788                Ok(br#"{"title":"Hello"}"#.to_vec())
2789            } else {
2790                bail!("unexpected asset {path}");
2791            }
2792        });
2793
2794        assert_eq!(entries["en"]["title"], json!("Hello"));
2795    }
2796
2797    struct TestCrossPackResolver;
2798
2799    impl CrossPackResolver for TestCrossPackResolver {
2800        fn invoke(
2801            &self,
2802            provider_id: &str,
2803            provider_type: Option<&str>,
2804            op: &str,
2805            input: &[u8],
2806            tenant: &str,
2807            team: Option<&str>,
2808        ) -> Result<Value> {
2809            Ok(json!({
2810                "provider_id": provider_id,
2811                "provider_type": provider_type,
2812                "op": op,
2813                "tenant": tenant,
2814                "team": team,
2815                "input": serde_json::from_slice::<Value>(input)?,
2816            }))
2817        }
2818    }
2819
2820    #[test]
2821    fn cross_pack_resolver_returns_node_output_when_present() {
2822        let mut engine = minimal_engine();
2823        engine.set_cross_pack_resolver(Arc::new(TestCrossPackResolver));
2824
2825        let output = engine
2826            .try_invoke_cross_pack_resolver(
2827                Some("mail"),
2828                Some("messaging"),
2829                "send",
2830                br#"{"subject":"hello"}"#,
2831                "demo",
2832            )
2833            .expect("resolver invocation")
2834            .expect("resolver output");
2835
2836        assert_eq!(
2837            output.payload,
2838            json!({
2839                "provider_id": "mail",
2840                "provider_type": "messaging",
2841                "op": "send",
2842                "tenant": "demo",
2843                "team": null,
2844                "input": { "subject": "hello" },
2845            })
2846        );
2847    }
2848
2849    #[test]
2850    fn parse_component_control_ignores_plain_payload() {
2851        let payload = json!({
2852            "flow": "not-a-control-field",
2853            "node": "n1"
2854        });
2855        let control = parse_component_control(&payload).expect("parse control");
2856        assert!(control.is_none());
2857    }
2858
2859    #[test]
2860    fn parse_component_control_parses_jump_marker() {
2861        let payload = json!({
2862            "greentic_control": {
2863                "action": "jump",
2864                "v": 1,
2865                "flow": "flow.b",
2866                "node": "node-2",
2867                "payload": { "message": "hi" },
2868                "hints": { "k": "v" },
2869                "max_redirects": 2,
2870                "reason": "handoff"
2871            }
2872        });
2873        let control = parse_component_control(&payload)
2874            .expect("parse control")
2875            .expect("missing control");
2876        match control {
2877            NodeControl::Jump(jump) => {
2878                assert_eq!(jump.flow, "flow.b");
2879                assert_eq!(jump.node.as_deref(), Some("node-2"));
2880                assert_eq!(jump.payload, json!({ "message": "hi" }));
2881                assert_eq!(jump.hints, json!({ "k": "v" }));
2882                assert_eq!(jump.max_redirects, Some(2));
2883                assert_eq!(jump.reason.as_deref(), Some("handoff"));
2884            }
2885            other => panic!("expected jump control, got {other:?}"),
2886        }
2887    }
2888
2889    #[test]
2890    fn parse_component_control_rejects_invalid_marker() {
2891        let payload = json!({
2892            "greentic_control": "bad-shape"
2893        });
2894        let err = parse_component_control(&payload).expect_err("expected invalid marker error");
2895        assert!(err.to_string().contains("greentic_control"));
2896    }
2897
2898    #[test]
2899    fn missing_operation_reports_node_and_component() {
2900        let engine = minimal_engine();
2901        let rt = Runtime::new().unwrap();
2902        let retry_config = RetryConfig {
2903            max_attempts: 1,
2904            base_delay_ms: 1,
2905        };
2906        let ctx = FlowContext {
2907            tenant: "tenant",
2908            pack_id: "test-pack",
2909            flow_id: "flow",
2910            node_id: Some("missing-op"),
2911            tool: None,
2912            action: None,
2913            session_id: None,
2914            provider_id: None,
2915            retry_config,
2916            attempt: 1,
2917            observer: None,
2918            mocks: None,
2919        };
2920        let node = HostNode {
2921            kind: NodeKind::Exec {
2922                target_component: "qa.process".into(),
2923            },
2924            component: "component.exec".into(),
2925            component_id: "component.exec".into(),
2926            operation_name: None,
2927            operation_in_mapping: None,
2928            payload_expr: Value::Null,
2929            routing: Routing::End,
2930        };
2931        let _state = ExecutionState::new(Value::Null);
2932        let payload = json!({ "component": "qa.process" });
2933        let event = NodeEvent {
2934            context: &ctx,
2935            node_id: "missing-op",
2936            node: &node,
2937            payload: &payload,
2938        };
2939        let err = rt
2940            .block_on(engine.execute_component_exec(
2941                &ctx,
2942                "missing-op",
2943                &node,
2944                payload.clone(),
2945                &event,
2946                ComponentOverrides {
2947                    component: None,
2948                    operation: None,
2949                },
2950            ))
2951            .unwrap_err();
2952        let message = err.to_string();
2953        assert!(
2954            message.contains("missing operation for node `missing-op`"),
2955            "unexpected message: {message}"
2956        );
2957        assert!(
2958            message.contains("(component `component.exec`)"),
2959            "unexpected message: {message}"
2960        );
2961    }
2962
2963    #[test]
2964    fn missing_operation_mentions_mapping_hint() {
2965        let engine = minimal_engine();
2966        let rt = Runtime::new().unwrap();
2967        let retry_config = RetryConfig {
2968            max_attempts: 1,
2969            base_delay_ms: 1,
2970        };
2971        let ctx = FlowContext {
2972            tenant: "tenant",
2973            pack_id: "test-pack",
2974            flow_id: "flow",
2975            node_id: Some("missing-op-hint"),
2976            tool: None,
2977            action: None,
2978            session_id: None,
2979            provider_id: None,
2980            retry_config,
2981            attempt: 1,
2982            observer: None,
2983            mocks: None,
2984        };
2985        let node = HostNode {
2986            kind: NodeKind::Exec {
2987                target_component: "qa.process".into(),
2988            },
2989            component: "component.exec".into(),
2990            component_id: "component.exec".into(),
2991            operation_name: None,
2992            operation_in_mapping: Some("render".into()),
2993            payload_expr: Value::Null,
2994            routing: Routing::End,
2995        };
2996        let _state = ExecutionState::new(Value::Null);
2997        let payload = json!({ "component": "qa.process" });
2998        let event = NodeEvent {
2999            context: &ctx,
3000            node_id: "missing-op-hint",
3001            node: &node,
3002            payload: &payload,
3003        };
3004        let err = rt
3005            .block_on(engine.execute_component_exec(
3006                &ctx,
3007                "missing-op-hint",
3008                &node,
3009                payload.clone(),
3010                &event,
3011                ComponentOverrides {
3012                    component: None,
3013                    operation: None,
3014                },
3015            ))
3016            .unwrap_err();
3017        let message = err.to_string();
3018        assert!(
3019            message.contains("missing operation for node `missing-op-hint`"),
3020            "unexpected message: {message}"
3021        );
3022        assert!(
3023            message.contains("Found operation in input.mapping (`render`)"),
3024            "unexpected message: {message}"
3025        );
3026    }
3027
3028    struct CountingObserver {
3029        starts: Mutex<Vec<String>>,
3030        ends: Mutex<Vec<Value>>,
3031    }
3032
3033    impl CountingObserver {
3034        fn new() -> Self {
3035            Self {
3036                starts: Mutex::new(Vec::new()),
3037                ends: Mutex::new(Vec::new()),
3038            }
3039        }
3040    }
3041
3042    impl ExecutionObserver for CountingObserver {
3043        fn on_node_start(&self, event: &NodeEvent<'_>) {
3044            self.starts.lock().unwrap().push(event.node_id.to_string());
3045        }
3046
3047        fn on_node_end(&self, _event: &NodeEvent<'_>, output: &Value) {
3048            self.ends.lock().unwrap().push(output.clone());
3049        }
3050
3051        fn on_node_error(&self, _event: &NodeEvent<'_>, _error: &dyn StdError) {}
3052    }
3053
3054    #[test]
3055    fn emits_end_event_for_successful_node() {
3056        let node_id = NodeId::from_str("emit").unwrap();
3057        let node = Node {
3058            id: node_id.clone(),
3059            component: FlowComponentRef {
3060                id: "emit.log".parse().unwrap(),
3061                pack_alias: None,
3062                operation: None,
3063            },
3064            input: InputMapping {
3065                mapping: json!({ "message": "logged" }),
3066            },
3067            output: OutputMapping {
3068                mapping: Value::Null,
3069            },
3070            err_map: None,
3071            routing: Routing::End,
3072            telemetry: TelemetryHints::default(),
3073        };
3074        let mut nodes = indexmap::IndexMap::default();
3075        nodes.insert(node_id.clone(), node);
3076        let flow = Flow {
3077            schema_version: "1.0".into(),
3078            id: FlowId::from_str("emit.flow").unwrap(),
3079            kind: FlowKind::Messaging,
3080            entrypoints: BTreeMap::from([(
3081                "default".to_string(),
3082                Value::String(node_id.to_string()),
3083            )]),
3084            nodes,
3085            metadata: Default::default(),
3086        };
3087        let host_flow = HostFlow::from(flow);
3088
3089        let engine = FlowEngine {
3090            packs: Vec::new(),
3091            flows: Vec::new(),
3092            flow_sources: HashMap::new(),
3093            flow_cache: RwLock::new(HashMap::from([(
3094                FlowKey {
3095                    pack_id: "test-pack".to_string(),
3096                    flow_id: "emit.flow".to_string(),
3097                },
3098                host_flow,
3099            )])),
3100            default_env: "local".to_string(),
3101            validation: ValidationConfig {
3102                mode: ValidationMode::Off,
3103            },
3104            cross_pack_resolver: None,
3105            rollout_ids: RolloutIds::default(),
3106        };
3107        let observer = CountingObserver::new();
3108        let ctx = FlowContext {
3109            tenant: "demo",
3110            pack_id: "test-pack",
3111            flow_id: "emit.flow",
3112            node_id: None,
3113            tool: None,
3114            action: None,
3115            session_id: None,
3116            provider_id: None,
3117            retry_config: RetryConfig {
3118                max_attempts: 1,
3119                base_delay_ms: 1,
3120            },
3121            attempt: 1,
3122            observer: Some(&observer),
3123            mocks: None,
3124        };
3125
3126        let rt = Runtime::new().unwrap();
3127        let result = rt.block_on(engine.execute(ctx, Value::Null)).unwrap();
3128        assert!(matches!(result.status, FlowStatus::Completed));
3129
3130        let starts = observer.starts.lock().unwrap();
3131        let ends = observer.ends.lock().unwrap();
3132        assert_eq!(starts.len(), 1);
3133        assert_eq!(ends.len(), 1);
3134        assert_eq!(ends[0], json!({ "message": "logged" }));
3135    }
3136
3137    fn host_flow_for_test(
3138        flow_id: &str,
3139        node_ids: &[&str],
3140        default_start: Option<&str>,
3141    ) -> HostFlow {
3142        let mut nodes = indexmap::IndexMap::default();
3143        for node_id in node_ids {
3144            let id = NodeId::from_str(node_id).unwrap();
3145            let node = Node {
3146                id: id.clone(),
3147                component: FlowComponentRef {
3148                    id: "emit.log".parse().unwrap(),
3149                    pack_alias: None,
3150                    operation: None,
3151                },
3152                input: InputMapping {
3153                    mapping: json!({ "message": node_id }),
3154                },
3155                output: OutputMapping {
3156                    mapping: Value::Null,
3157                },
3158                err_map: None,
3159                routing: Routing::End,
3160                telemetry: TelemetryHints::default(),
3161            };
3162            nodes.insert(id, node);
3163        }
3164        let mut entrypoints = BTreeMap::new();
3165        if let Some(start) = default_start {
3166            entrypoints.insert("default".to_string(), Value::String(start.to_string()));
3167        }
3168        HostFlow::from(Flow {
3169            schema_version: "1.0".into(),
3170            id: FlowId::from_str(flow_id).unwrap(),
3171            kind: FlowKind::Messaging,
3172            entrypoints,
3173            nodes,
3174            metadata: Default::default(),
3175        })
3176    }
3177
3178    fn jump_test_engine() -> FlowEngine {
3179        let target_flow = host_flow_for_test("flow.target", &["node-a", "node-b"], None);
3180        FlowEngine {
3181            packs: Vec::new(),
3182            flows: Vec::new(),
3183            flow_sources: HashMap::new(),
3184            flow_cache: RwLock::new(HashMap::from([(
3185                FlowKey {
3186                    pack_id: "test-pack".to_string(),
3187                    flow_id: "flow.target".to_string(),
3188                },
3189                target_flow,
3190            )])),
3191            default_env: "local".to_string(),
3192            validation: ValidationConfig {
3193                mode: ValidationMode::Off,
3194            },
3195            cross_pack_resolver: None,
3196            rollout_ids: RolloutIds::default(),
3197        }
3198    }
3199
3200    fn jump_ctx<'a>(flow_id: &'a str) -> FlowContext<'a> {
3201        FlowContext {
3202            tenant: "demo",
3203            pack_id: "test-pack",
3204            flow_id,
3205            node_id: None,
3206            tool: None,
3207            action: None,
3208            session_id: None,
3209            provider_id: None,
3210            retry_config: RetryConfig {
3211                max_attempts: 1,
3212                base_delay_ms: 1,
3213            },
3214            attempt: 1,
3215            observer: None,
3216            mocks: None,
3217        }
3218    }
3219
3220    #[test]
3221    fn with_rollout_ids_binds_revision_identity() {
3222        let engine = minimal_engine().with_rollout_ids(RolloutIds {
3223            customer_id: Some("cust-acme".into()),
3224            deployment_id: Some("01JTKS".into()),
3225            bundle_id: Some("customer.support".into()),
3226            revision_id: Some("01JTKR".into()),
3227        });
3228        assert_eq!(engine.rollout_ids.revision_id.as_deref(), Some("01JTKR"));
3229        assert_eq!(engine.rollout_ids.deployment_id.as_deref(), Some("01JTKS"));
3230        // A freshly-built engine carries no rollout identity (legacy runtime).
3231        assert!(minimal_engine().rollout_ids.is_empty());
3232    }
3233
3234    #[test]
3235    fn apply_jump_unknown_flow_errors() {
3236        let engine = minimal_engine();
3237        let mut state = ExecutionState::new(Value::Null);
3238        let rt = Runtime::new().unwrap();
3239        let err = rt
3240            .block_on(engine.apply_jump(
3241                &jump_ctx("flow.source"),
3242                &mut state,
3243                JumpControl {
3244                    flow: "flow.missing".into(),
3245                    node: None,
3246                    payload: json!({ "ok": true }),
3247                    hints: Value::Null,
3248                    max_redirects: None,
3249                    reason: None,
3250                },
3251            ))
3252            .unwrap_err();
3253        assert!(
3254            err.to_string().contains("unknown_flow"),
3255            "unexpected error: {err}"
3256        );
3257    }
3258
3259    #[test]
3260    fn apply_jump_unknown_node_errors() {
3261        let engine = jump_test_engine();
3262        let mut state = ExecutionState::new(Value::Null);
3263        let rt = Runtime::new().unwrap();
3264        let err = rt
3265            .block_on(engine.apply_jump(
3266                &jump_ctx("flow.source"),
3267                &mut state,
3268                JumpControl {
3269                    flow: "flow.target".into(),
3270                    node: Some("node-missing".into()),
3271                    payload: json!({ "ok": true }),
3272                    hints: Value::Null,
3273                    max_redirects: None,
3274                    reason: None,
3275                },
3276            ))
3277            .unwrap_err();
3278        assert!(
3279            err.to_string().contains("unknown_node"),
3280            "unexpected error: {err}"
3281        );
3282    }
3283
3284    #[test]
3285    fn apply_jump_uses_default_start_fallback() {
3286        let engine = jump_test_engine();
3287        let mut state = ExecutionState::new(Value::Null);
3288        let rt = Runtime::new().unwrap();
3289        let target = rt
3290            .block_on(engine.apply_jump(
3291                &jump_ctx("flow.source"),
3292                &mut state,
3293                JumpControl {
3294                    flow: "flow.target".into(),
3295                    node: None,
3296                    payload: json!({ "k": "v" }),
3297                    hints: Value::Null,
3298                    max_redirects: None,
3299                    reason: None,
3300                },
3301            ))
3302            .expect("jump target");
3303        assert_eq!(target.flow_id, "flow.target");
3304        assert_eq!(target.node_id.as_str(), "node-a");
3305    }
3306
3307    #[test]
3308    fn apply_jump_redirect_limit_enforced() {
3309        let engine = jump_test_engine();
3310        let mut state = ExecutionState::new(Value::Null);
3311        state.redirect_count = 3;
3312        let rt = Runtime::new().unwrap();
3313        let err = rt
3314            .block_on(engine.apply_jump(
3315                &jump_ctx("flow.source"),
3316                &mut state,
3317                JumpControl {
3318                    flow: "flow.target".into(),
3319                    node: None,
3320                    payload: json!({ "k": "v" }),
3321                    hints: Value::Null,
3322                    max_redirects: Some(3),
3323                    reason: None,
3324                },
3325            ))
3326            .unwrap_err();
3327        assert_eq!(err.to_string(), "redirect_limit");
3328    }
3329
3330    /// Regression: a `Routing::Custom` array containing at least one
3331    /// conditional entry must pause (return `Wait`) when no condition
3332    /// matches, instead of terminating. Concrete bug it guards against:
3333    /// every card click used to terminate the flow because the entry-card's
3334    /// routing array didn't enumerate every downstream action, so users got
3335    /// looped back to the entry on every interaction.
3336    #[test]
3337    fn evaluate_custom_routing_waits_when_conditional_falls_through() {
3338        let raw_routing = json!([
3339            { "condition": "response.action == \"go\"", "to": "next" },
3340            { "out": true }
3341        ]);
3342        let flow_ir = HostFlow {
3343            id: "flow.test".to_string(),
3344            start: None,
3345            nodes: IndexMap::new(),
3346            slot_schema: None,
3347        };
3348        let current_node = NodeId::from_str("current").unwrap();
3349        let output = NodeOutput::new(Value::Null);
3350
3351        // First case: empty action -> conditional does not match, must wait.
3352        let mut state_empty = ExecutionState::new(json!({ "metadata": { "action": "" } }));
3353        state_empty.entry = json!({ "metadata": { "action": "" } });
3354        let decision_empty =
3355            evaluate_custom_routing(&raw_routing, &output, &state_empty, &flow_ir, &current_node);
3356        assert!(
3357            matches!(decision_empty, CustomRoutingDecision::Wait),
3358            "expected Wait on conditional fall-through, got {decision_empty:?}"
3359        );
3360
3361        // Second case: action == "go" -> conditional matches, must advance.
3362        let mut state_go = ExecutionState::new(json!({ "metadata": { "action": "go" } }));
3363        state_go.entry = json!({ "metadata": { "action": "go" } });
3364        let decision_go =
3365            evaluate_custom_routing(&raw_routing, &output, &state_go, &flow_ir, &current_node);
3366        match decision_go {
3367            CustomRoutingDecision::Next(nid) => assert_eq!(nid.as_str(), "next"),
3368            other => panic!("expected Next(\"next\"), got {other:?}"),
3369        }
3370    }
3371
3372    #[test]
3373    fn node_output_with_error_marks_ok_false_and_stashes_in_meta() {
3374        let err: Box<dyn std::error::Error + 'static> =
3375            Box::<dyn std::error::Error + 'static>::from("weatherapi returned 401 Unauthorized");
3376        let out = NodeOutput::with_error("call_weather", err.as_ref());
3377        assert!(!out.ok);
3378        assert_eq!(out.payload, Value::Null);
3379        assert_eq!(out.meta["error"]["kind"], "flow_node_failed");
3380        assert_eq!(out.meta["error"]["node_id"], "call_weather");
3381        assert_eq!(
3382            out.meta["error"]["message"],
3383            "weatherapi returned 401 Unauthorized"
3384        );
3385    }
3386
3387    #[test]
3388    fn lift_first_node_error_promotes_node_meta_to_output_metadata() {
3389        // Two nodes ran; the first failed, the second produced a default-
3390        // looking output (flow author wrote no error routing). The executor
3391        // must lift the first failure into output.metadata so the messaging
3392        // provider renders the error card without any flow-author changes.
3393        let mut nodes: HashMap<String, NodeOutput> = HashMap::new();
3394        let err: Box<dyn std::error::Error + 'static> =
3395            Box::<dyn std::error::Error + 'static>::from("weatherapi returned 401 Unauthorized");
3396        nodes.insert(
3397            "call_weather".to_string(),
3398            NodeOutput::with_error("call_weather", err.as_ref()),
3399        );
3400        nodes.insert(
3401            "render_current_card".to_string(),
3402            NodeOutput::new(json!({ "text": "message" })),
3403        );
3404
3405        let final_output = json!({ "text": "message" });
3406        let enriched = lift_first_node_error_from_nodes(final_output, &nodes);
3407        assert_eq!(
3408            enriched["metadata"]["error_kind"], "flow_node_failed",
3409            "first failing node's kind must be lifted"
3410        );
3411        assert_eq!(
3412            enriched["metadata"]["error_message"],
3413            "weatherapi returned 401 Unauthorized"
3414        );
3415        assert_eq!(enriched["metadata"]["node_id"], "call_weather");
3416        // Preserves the original payload bits so downstream renderers still
3417        // see what the flow produced.
3418        assert_eq!(enriched["text"], "message");
3419    }
3420
3421    #[test]
3422    fn lift_first_node_error_is_noop_when_all_nodes_ok() {
3423        let mut nodes: HashMap<String, NodeOutput> = HashMap::new();
3424        nodes.insert(
3425            "ok_node".to_string(),
3426            NodeOutput::new(json!({ "text": "all good" })),
3427        );
3428        let output = json!({ "text": "all good" });
3429        let lifted = lift_first_node_error_from_nodes(output.clone(), &nodes);
3430        assert_eq!(lifted, output);
3431    }
3432
3433    #[tokio::test]
3434    async fn execute_user_facing_flow_failure_returns_completed_with_error_envelope() {
3435        // Flow whose start node is missing — drive_flow will return Err on
3436        // node lookup. With session_id present, execute() must convert that
3437        // to a Completed FlowExecution carrying error_kind/error_message in
3438        // output.metadata so the chat user sees the error card.
3439        let flow_id_str = "broken.flow";
3440        let pack_id_str = "test-pack";
3441        let host_flow = host_flow_for_test(flow_id_str, &["only-node"], Some("does-not-exist"));
3442        let engine = FlowEngine {
3443            packs: Vec::new(),
3444            flows: Vec::new(),
3445            flow_sources: HashMap::new(),
3446            flow_cache: RwLock::new(HashMap::from([(
3447                FlowKey {
3448                    pack_id: pack_id_str.to_string(),
3449                    flow_id: flow_id_str.to_string(),
3450                },
3451                host_flow,
3452            )])),
3453            default_env: "local".to_string(),
3454            validation: ValidationConfig {
3455                mode: ValidationMode::Off,
3456            },
3457            cross_pack_resolver: None,
3458            rollout_ids: RolloutIds::default(),
3459        };
3460        let ctx = FlowContext {
3461            tenant: "demo",
3462            pack_id: pack_id_str,
3463            flow_id: flow_id_str,
3464            node_id: None,
3465            tool: None,
3466            action: None,
3467            session_id: Some("conv-1"),
3468            provider_id: None,
3469            retry_config: RetryConfig {
3470                max_attempts: 1,
3471                base_delay_ms: 1,
3472            },
3473            attempt: 1,
3474            observer: None,
3475            mocks: None,
3476        };
3477        let result = engine
3478            .execute(ctx, Value::Null)
3479            .await
3480            .expect("must not propagate Err");
3481        assert!(matches!(result.status, FlowStatus::Completed));
3482        assert_eq!(
3483            result.output["metadata"]["error_kind"],
3484            "flow_execution_failed"
3485        );
3486        let msg = result.output["metadata"]["error_message"]
3487            .as_str()
3488            .unwrap_or("");
3489        assert!(!msg.is_empty(), "error_message must be populated");
3490        assert_eq!(result.output["metadata"]["flow_id"], "broken.flow");
3491    }
3492
3493    #[test]
3494    fn mcp_tool_error_recognises_generator_error_shape() {
3495        // greentic-mcp-generator's tool_error_with_status emits this exact
3496        // shape when the upstream HTTP call to weatherapi.com returns 401.
3497        let value = json!({
3498            "error": {
3499                "code": "tool_error",
3500                "message": "API request returned status 401",
3501                "status": 401
3502            }
3503        });
3504        let (code, message) = mcp_tool_error(&value).expect("must detect MCP error shape");
3505        assert_eq!(code, "tool_error");
3506        assert!(message.contains("API request returned status 401"));
3507        assert!(message.contains("(status 401)"));
3508    }
3509
3510    #[test]
3511    fn mcp_tool_error_skips_success_responses() {
3512        // A success response uses `result`, not `error`.
3513        let value = json!({ "result": { "current": { "temp_c": 19.0 } } });
3514        assert!(mcp_tool_error(&value).is_none());
3515    }
3516
3517    #[test]
3518    fn mcp_tool_error_skips_non_object_and_unrelated_shapes() {
3519        assert!(mcp_tool_error(&Value::Null).is_none());
3520        assert!(mcp_tool_error(&json!({"unrelated": true})).is_none());
3521        // `error` must be an object; a string isn't enough.
3522        assert!(mcp_tool_error(&json!({"error": "oops"})).is_none());
3523    }
3524
3525    #[tokio::test]
3526    async fn execute_non_user_facing_flow_failure_still_propagates() {
3527        // No session_id => internal job. Errors still propagate as Err so
3528        // operator alerting / metrics pipelines stay intact.
3529        let flow_id_str = "broken.flow";
3530        let pack_id_str = "test-pack";
3531        let host_flow = host_flow_for_test(flow_id_str, &["only-node"], Some("does-not-exist"));
3532        let engine = FlowEngine {
3533            packs: Vec::new(),
3534            flows: Vec::new(),
3535            flow_sources: HashMap::new(),
3536            flow_cache: RwLock::new(HashMap::from([(
3537                FlowKey {
3538                    pack_id: pack_id_str.to_string(),
3539                    flow_id: flow_id_str.to_string(),
3540                },
3541                host_flow,
3542            )])),
3543            default_env: "local".to_string(),
3544            validation: ValidationConfig {
3545                mode: ValidationMode::Off,
3546            },
3547            cross_pack_resolver: None,
3548            rollout_ids: RolloutIds::default(),
3549        };
3550        let ctx = FlowContext {
3551            tenant: "demo",
3552            pack_id: pack_id_str,
3553            flow_id: flow_id_str,
3554            node_id: None,
3555            tool: None,
3556            action: None,
3557            session_id: None,
3558            provider_id: None,
3559            retry_config: RetryConfig {
3560                max_attempts: 1,
3561                base_delay_ms: 1,
3562            },
3563            attempt: 1,
3564            observer: None,
3565            mocks: None,
3566        };
3567        let result = engine.execute(ctx, Value::Null).await;
3568        assert!(result.is_err(), "non-user-facing flow must propagate Err");
3569    }
3570
3571    // ---- Phase D: slot_schema injection tests ----
3572
3573    #[test]
3574    fn host_flow_extracts_slot_schema_from_metadata_extra() {
3575        use greentic_types::FlowMetadata;
3576        use std::collections::BTreeSet;
3577
3578        let schema = json!([
3579            {"name": "counterparty", "slot_type": "string", "required": true},
3580            {"name": "due_date", "slot_type": "date", "required": true}
3581        ]);
3582        let flow = Flow {
3583            schema_version: "flow-v1".into(),
3584            id: FlowId::from_str("test.flow").unwrap(),
3585            kind: FlowKind::Messaging,
3586            entrypoints: BTreeMap::new(),
3587            nodes: IndexMap::default(),
3588            metadata: FlowMetadata {
3589                title: None,
3590                description: None,
3591                tags: BTreeSet::new(),
3592                extra: json!({(SLOT_SCHEMA_METADATA_KEY): schema}),
3593            },
3594        };
3595        let host = HostFlow::from(flow);
3596        assert_eq!(
3597            host.slot_schema.as_ref(),
3598            Some(&schema),
3599            "HostFlow must extract slot_schema from metadata.extra"
3600        );
3601    }
3602
3603    #[test]
3604    fn host_flow_slot_schema_is_none_when_absent() {
3605        let flow = Flow {
3606            schema_version: "flow-v1".into(),
3607            id: FlowId::from_str("test.flow").unwrap(),
3608            kind: FlowKind::Messaging,
3609            entrypoints: BTreeMap::new(),
3610            nodes: IndexMap::default(),
3611            metadata: Default::default(),
3612        };
3613        let host = HostFlow::from(flow);
3614        assert!(
3615            host.slot_schema.is_none(),
3616            "HostFlow.slot_schema must be None when metadata.extra has no greentic.slot_schema"
3617        );
3618    }
3619
3620    #[test]
3621    fn inject_slot_definitions_adds_to_object_input() {
3622        let schema = json!([
3623            {"name": "city", "slot_type": "string"}
3624        ]);
3625        let mut input = json!({"utterance": "hello"});
3626        inject_slot_definitions(&mut input, &schema, "f", "n");
3627        assert_eq!(
3628            input,
3629            json!({"utterance": "hello", "slot_definitions": schema}),
3630            "slot_definitions must be injected into existing object"
3631        );
3632    }
3633
3634    #[test]
3635    fn inject_slot_definitions_wraps_null_input() {
3636        let schema = json!([{"name": "x", "slot_type": "string"}]);
3637        let mut input = Value::Null;
3638        inject_slot_definitions(&mut input, &schema, "f", "n");
3639        assert_eq!(
3640            input,
3641            json!({"slot_definitions": schema}),
3642            "null input must become an object with slot_definitions"
3643        );
3644    }
3645
3646    #[test]
3647    fn inject_slot_definitions_preserves_explicit_inline() {
3648        let flow_schema = json!([{"name": "city", "slot_type": "string"}]);
3649        let inline_defs = json!([{"name": "country", "slot_type": "string"}]);
3650        let mut input = json!({
3651            "utterance": "hello",
3652            "slot_definitions": inline_defs
3653        });
3654        inject_slot_definitions(&mut input, &flow_schema, "f", "n");
3655        assert_eq!(
3656            input["slot_definitions"], inline_defs,
3657            "explicit inline slot_definitions must not be overwritten"
3658        );
3659    }
3660
3661    #[test]
3662    fn inject_slot_definitions_skips_non_object_input() {
3663        let schema = json!([{"name": "x", "slot_type": "string"}]);
3664        let mut input = json!("a string");
3665        inject_slot_definitions(&mut input, &schema, "f", "n");
3666        assert_eq!(
3667            input,
3668            json!("a string"),
3669            "non-object input must be left unchanged"
3670        );
3671    }
3672
3673    fn make_flow_doc_for_test(
3674        id: &str,
3675        node_name: &str,
3676        component: &str,
3677        slot_schema: Option<Value>,
3678    ) -> greentic_flow::model::FlowDoc {
3679        use greentic_flow::model::{FlowDoc, NodeDoc};
3680
3681        let mut nodes = IndexMap::new();
3682        nodes.insert(
3683            node_name.to_string(),
3684            NodeDoc {
3685                raw: {
3686                    let mut m = IndexMap::new();
3687                    m.insert(
3688                        "component.exec".to_string(),
3689                        json!({ "component": component }),
3690                    );
3691                    m
3692                },
3693                routing: json!([{ "out": true }]),
3694                ..Default::default()
3695            },
3696        );
3697
3698        FlowDoc {
3699            id: id.into(),
3700            title: None,
3701            description: None,
3702            flow_type: "messaging".into(),
3703            start: Some(node_name.into()),
3704            parameters: json!({}),
3705            tags: Vec::new(),
3706            schema_version: None,
3707            entrypoints: IndexMap::new(),
3708            meta: None,
3709            slot_schema,
3710            nodes,
3711        }
3712    }
3713
3714    /// Integration test: exercises the real `greentic_flow::compile_flow`
3715    /// producer path with a `FlowDoc` carrying `slot_schema`, then converts
3716    /// through `HostFlow::from` and verifies the runtime-side `slot_schema`
3717    /// field is populated — closing the gap Codex flagged where the existing
3718    /// unit tests constructed `FlowMetadata` directly.
3719    #[test]
3720    fn compile_flow_round_trips_slot_schema_into_host_flow() {
3721        let slot_defs = json!([
3722            { "name": "counterparty", "slot_type": "string", "required": true,
3723              "pattern": ".+" },
3724            { "name": "due_date", "slot_type": "date", "required": true,
3725              "pattern": "\\d{4}-\\d{2}-\\d{2}" }
3726        ]);
3727        let doc = make_flow_doc_for_test(
3728            "slot-test",
3729            "extractor",
3730            "slot-extractor",
3731            Some(slot_defs.clone()),
3732        );
3733
3734        let flow = greentic_flow::compile_flow(doc).expect("compile_flow must succeed");
3735        assert_eq!(
3736            flow.metadata.extra.get(SLOT_SCHEMA_METADATA_KEY),
3737            Some(&slot_defs),
3738            "compile_flow must forward slot_schema into metadata.extra"
3739        );
3740
3741        let host = HostFlow::from(flow);
3742        assert_eq!(
3743            host.slot_schema.as_ref(),
3744            Some(&slot_defs),
3745            "HostFlow.slot_schema must survive the compile_flow -> HostFlow round-trip"
3746        );
3747    }
3748
3749    /// Verify that `compile_flow` without `slot_schema` produces a `Flow`
3750    /// whose `metadata.extra` has no `greentic.slot_schema` key, and that
3751    /// `HostFlow.slot_schema` stays `None` through the real compile path.
3752    #[test]
3753    fn compile_flow_without_slot_schema_leaves_host_flow_none() {
3754        let doc = make_flow_doc_for_test("no-slots", "echo", "echo", None);
3755
3756        let flow = greentic_flow::compile_flow(doc).expect("compile_flow must succeed");
3757        assert!(
3758            flow.metadata.extra.get(SLOT_SCHEMA_METADATA_KEY).is_none(),
3759            "metadata.extra must not contain greentic.slot_schema when FlowDoc.slot_schema is None"
3760        );
3761
3762        let host = HostFlow::from(flow);
3763        assert!(
3764            host.slot_schema.is_none(),
3765            "HostFlow.slot_schema must be None when FlowDoc has no slot_schema"
3766        );
3767    }
3768}
3769
3770use tracing::Instrument;
3771
3772pub struct FlowContext<'a> {
3773    pub tenant: &'a str,
3774    pub pack_id: &'a str,
3775    pub flow_id: &'a str,
3776    pub node_id: Option<&'a str>,
3777    pub tool: Option<&'a str>,
3778    pub action: Option<&'a str>,
3779    pub session_id: Option<&'a str>,
3780    pub provider_id: Option<&'a str>,
3781    pub retry_config: RetryConfig,
3782    pub attempt: u32,
3783    pub observer: Option<&'a dyn ExecutionObserver>,
3784    pub mocks: Option<&'a MockLayer>,
3785}
3786
3787#[derive(Copy, Clone)]
3788pub struct RetryConfig {
3789    pub max_attempts: u32,
3790    pub base_delay_ms: u64,
3791}
3792
3793/// Look across all node outputs, find the first one that finished with
3794/// `ok=false`, and lift its `meta.error` fields into
3795/// `output.metadata.error_kind` / `.error_message` / `.node_id`. Returns the
3796/// (possibly enriched) output unchanged otherwise.
3797///
3798/// This is how the executor "shows" an unhandled flow-node failure to the
3799/// caller without the flow author having to add error routing: the chat-side
3800/// provider (messaging-providers `extract_error_envelope`) picks the lifted
3801/// fields off `output.metadata` and renders a styled error card.
3802///
3803/// Takes a borrow of the node-output map rather than the whole
3804/// `ExecutionState` because the callers have already consumed `state` via
3805/// `state.finalize_with(...)`; we capture a cheap clone of `state.nodes` up
3806/// front and pass it in here.
3807fn lift_first_node_error_from_nodes(output: Value, nodes: &HashMap<String, NodeOutput>) -> Value {
3808    let Some((node_id, failed)) = nodes.iter().find(|(_, out)| !out.ok) else {
3809        return output;
3810    };
3811    let err_meta = failed.meta.get("error");
3812    let message = err_meta
3813        .and_then(|e| e.get("message"))
3814        .and_then(|v| v.as_str())
3815        .unwrap_or("flow node failed");
3816    let kind = err_meta
3817        .and_then(|e| e.get("kind"))
3818        .and_then(|v| v.as_str())
3819        .unwrap_or("flow_node_failed");
3820
3821    let mut output = match output {
3822        Value::Object(map) => map,
3823        Value::Null => JsonMap::new(),
3824        other => {
3825            let mut wrap = JsonMap::new();
3826            wrap.insert("payload".to_string(), other);
3827            wrap
3828        }
3829    };
3830    let metadata_entry = output
3831        .entry("metadata".to_string())
3832        .or_insert_with(|| Value::Object(JsonMap::new()));
3833    let metadata_map = match metadata_entry {
3834        Value::Object(map) => map,
3835        _ => {
3836            *metadata_entry = Value::Object(JsonMap::new());
3837            metadata_entry.as_object_mut().unwrap()
3838        }
3839    };
3840    metadata_map
3841        .entry("error_kind".to_string())
3842        .or_insert(Value::String(kind.to_string()));
3843    metadata_map
3844        .entry("error_message".to_string())
3845        .or_insert(Value::String(message.to_string()));
3846    metadata_map
3847        .entry("node_id".to_string())
3848        .or_insert(Value::String(node_id.clone()));
3849    Value::Object(output)
3850}
3851
3852fn should_retry(err: &anyhow::Error) -> bool {
3853    let lower = err.to_string().to_lowercase();
3854    lower.contains("transient")
3855        || lower.contains("unavailable")
3856        || lower.contains("internal")
3857        || lower.contains("timeout")
3858}
3859
3860impl From<FlowRetryConfig> for RetryConfig {
3861    fn from(value: FlowRetryConfig) -> Self {
3862        Self {
3863            max_attempts: value.max_attempts.max(1),
3864            base_delay_ms: value.base_delay_ms.max(50),
3865        }
3866    }
3867}