Skip to main content

greentic_runner_host/runner/
engine.rs

1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
9use anyhow::{Context, Result, anyhow, bail};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use super::templating::{TemplateOptions, render_template_value};
18use crate::config::{FlowRetryConfig, HostConfig};
19use crate::pack::{FlowDescriptor, PackRuntime};
20use crate::runner::invocation::{InvocationMeta, build_invocation_envelope};
21use crate::telemetry::{
22    FlowSpanAttributes, RolloutIds, annotate_span, backoff_delay_ms, set_flow_context,
23};
24#[cfg(feature = "fault-injection")]
25use crate::testing::fault_injection::{FaultContext, FaultPoint, maybe_fail};
26use crate::validate::{
27    ValidationConfig, ValidationIssue, ValidationMode, validate_component_envelope,
28    validate_tool_envelope,
29};
30use greentic_flow::SLOT_SCHEMA_METADATA_KEY;
31use greentic_types::{Flow, Node, NodeId, Routing};
32
33/// Component ID of the slot-extractor WASM component. Used to detect
34/// slot-extractor nodes and inject flow-level `slot_schema` as
35/// `slot_definitions` into the invocation payload (Phase D).
36const SLOT_EXTRACTOR_COMPONENT_ID: &str = "ai.greentic.component-slot-extractor";
37
38/// Callback trait for resolving cross-pack provider invocations.
39///
40/// When a `provider.invoke` node references a provider that is not in the
41/// current pack, the flow engine calls this resolver as a fallback.
42/// Implementations typically delegate to a capability registry that knows
43/// about all packs in the bundle.
44pub trait CrossPackResolver: Send + Sync {
45    fn invoke(
46        &self,
47        provider_id: &str,
48        provider_type: Option<&str>,
49        op: &str,
50        input: &[u8],
51        tenant: &str,
52        team: Option<&str>,
53    ) -> Result<Value>;
54}
55
56pub struct FlowEngine {
57    packs: Vec<Arc<PackRuntime>>,
58    flows: Vec<FlowDescriptor>,
59    flow_sources: HashMap<FlowKey, usize>,
60    flow_cache: RwLock<HashMap<FlowKey, HostFlow>>,
61    default_env: String,
62    validation: ValidationConfig,
63    cross_pack_resolver: Option<Arc<dyn CrossPackResolver>>,
64    /// Rollout identifiers of the revision-keyed runtime this engine belongs to,
65    /// stamped onto every per-invocation `TenantCtx` for telemetry attribution
66    /// (C5.4). Empty for tenant-only (legacy) runtimes; the Phase-D revision
67    /// dispatcher supplies real IDs via [`with_rollout_ids`](Self::with_rollout_ids).
68    rollout_ids: RolloutIds,
69}
70
71#[derive(Clone, Debug, PartialEq, Eq, Hash)]
72struct FlowKey {
73    pack_id: String,
74    flow_id: String,
75}
76
77#[derive(Clone, Debug, Serialize, Deserialize)]
78pub struct FlowSnapshot {
79    pub pack_id: String,
80    pub flow_id: String,
81    #[serde(default, skip_serializing_if = "Option::is_none")]
82    pub next_flow: Option<String>,
83    pub next_node: String,
84    pub state: ExecutionState,
85}
86
87#[derive(Clone, Debug)]
88pub struct FlowWait {
89    pub reason: Option<String>,
90    pub snapshot: FlowSnapshot,
91}
92
93#[derive(Clone, Debug)]
94pub enum FlowStatus {
95    Completed,
96    Waiting(Box<FlowWait>),
97}
98
99#[derive(Clone, Debug)]
100pub struct FlowExecution {
101    pub output: Value,
102    pub status: FlowStatus,
103}
104
105#[derive(Clone, Debug)]
106struct HostFlow {
107    id: String,
108    start: Option<NodeId>,
109    nodes: IndexMap<NodeId, HostNode>,
110    /// Flow-level slot definitions extracted from `metadata.extra["greentic.slot_schema"]`.
111    /// Injected into slot-extractor component invocations at dispatch time (Phase D).
112    slot_schema: Option<Value>,
113}
114
115#[derive(Clone, Debug)]
116pub struct HostNode {
117    kind: NodeKind,
118    /// Backwards-compatible component label for observers/transcript.
119    pub component: String,
120    component_id: String,
121    operation_name: Option<String>,
122    operation_in_mapping: Option<String>,
123    payload_expr: Value,
124    routing: Routing,
125}
126
127impl HostNode {
128    pub fn component_id(&self) -> &str {
129        &self.component_id
130    }
131
132    pub fn operation_name(&self) -> Option<&str> {
133        self.operation_name.as_deref()
134    }
135
136    pub fn operation_in_mapping(&self) -> Option<&str> {
137        self.operation_in_mapping.as_deref()
138    }
139}
140
141#[derive(Clone, Debug)]
142enum NodeKind {
143    Exec { target_component: String },
144    PackComponent { component_ref: String },
145    ProviderInvoke,
146    FlowCall,
147    BuiltinEmit { kind: EmitKind },
148    BuiltinStateGet,
149    BuiltinStateSet,
150    Wait,
151}
152
153#[derive(Clone, Debug)]
154enum EmitKind {
155    Log,
156    Response,
157    Other(String),
158}
159
160struct ComponentOverrides<'a> {
161    component: Option<&'a str>,
162    operation: Option<&'a str>,
163}
164
165struct ComponentCall {
166    component_ref: String,
167    operation: String,
168    input: Value,
169    config: Value,
170}
171
172impl FlowExecution {
173    fn completed(output: Value) -> Self {
174        Self {
175            output,
176            status: FlowStatus::Completed,
177        }
178    }
179
180    fn waiting(output: Value, wait: FlowWait) -> Self {
181        Self {
182            output,
183            status: FlowStatus::Waiting(Box::new(wait)),
184        }
185    }
186}
187
188impl FlowEngine {
189    pub async fn new(packs: Vec<Arc<PackRuntime>>, config: Arc<HostConfig>) -> Result<Self> {
190        let mut flow_sources: HashMap<FlowKey, usize> = HashMap::new();
191        let mut descriptors = Vec::new();
192        let mut bindings = HashMap::new();
193        for pack in &config.pack_bindings {
194            bindings.insert(pack.pack_id.clone(), pack.flows.clone());
195        }
196        let enforce_bindings = !bindings.is_empty();
197        for (idx, pack) in packs.iter().enumerate() {
198            let pack_id = pack.metadata().pack_id.clone();
199            if enforce_bindings && !bindings.contains_key(&pack_id) {
200                bail!("no gtbind entries found for pack {}", pack_id);
201            }
202            let flows = pack.list_flows().await?;
203            let allowed = bindings.get(&pack_id).map(|flows| {
204                flows
205                    .iter()
206                    .cloned()
207                    .collect::<std::collections::HashSet<_>>()
208            });
209            let mut seen = std::collections::HashSet::new();
210            for flow in flows {
211                if let Some(ref allow) = allowed
212                    && !allow.contains(&flow.id)
213                {
214                    continue;
215                }
216                seen.insert(flow.id.clone());
217                tracing::info!(
218                    flow_id = %flow.id,
219                    flow_type = %flow.flow_type,
220                    pack_id = %flow.pack_id,
221                    pack_index = idx,
222                    "registered flow"
223                );
224                if let Ok(flow_ir) = pack.load_flow(&flow.id) {
225                    for node in flow_ir.nodes.values() {
226                        config
227                            .secrets_policy
228                            .register_flow_secret_refs(&node.input.mapping);
229                        config
230                            .secrets_policy
231                            .register_flow_secret_refs(&node.output.mapping);
232                    }
233                }
234                flow_sources.insert(
235                    FlowKey {
236                        pack_id: flow.pack_id.clone(),
237                        flow_id: flow.id.clone(),
238                    },
239                    idx,
240                );
241                descriptors.retain(|existing: &FlowDescriptor| {
242                    !(existing.id == flow.id && existing.pack_id == flow.pack_id)
243                });
244                descriptors.push(flow);
245            }
246            if let Some(allow) = allowed {
247                let missing = allow.difference(&seen).cloned().collect::<Vec<_>>();
248                if !missing.is_empty() {
249                    bail!(
250                        "gtbind flow ids missing in pack {}: {}",
251                        pack_id,
252                        missing.join(", ")
253                    );
254                }
255            }
256        }
257
258        let mut flow_map = HashMap::new();
259        for flow in &descriptors {
260            let pack_id = flow.pack_id.clone();
261            if let Some(&pack_idx) = flow_sources.get(&FlowKey {
262                pack_id: pack_id.clone(),
263                flow_id: flow.id.clone(),
264            }) {
265                let pack_clone = Arc::clone(&packs[pack_idx]);
266                let flow_id = flow.id.clone();
267                let task_flow_id = flow_id.clone();
268                match task::spawn_blocking(move || pack_clone.load_flow(&task_flow_id)).await {
269                    Ok(Ok(loaded_flow)) => {
270                        flow_map.insert(
271                            FlowKey {
272                                pack_id: pack_id.clone(),
273                                flow_id,
274                            },
275                            HostFlow::from(loaded_flow),
276                        );
277                    }
278                    Ok(Err(err)) => {
279                        tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
280                    }
281                    Err(err) => {
282                        tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
283                    }
284                }
285            }
286        }
287
288        Ok(Self {
289            packs,
290            flows: descriptors,
291            flow_sources,
292            flow_cache: RwLock::new(flow_map),
293            default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
294            validation: config.validation.clone(),
295            cross_pack_resolver: None,
296            rollout_ids: RolloutIds::default(),
297        })
298    }
299
300    /// Bind the rollout identifiers of the revision-keyed runtime this engine
301    /// serves, so every invocation's telemetry carries deployment/bundle/
302    /// revision attribution (C5.4). Called by the Phase-D revision dispatcher
303    /// when it constructs a revision runtime; tenant-only runtimes leave the
304    /// default (empty) IDs.
305    pub fn with_rollout_ids(mut self, rollout_ids: RolloutIds) -> Self {
306        self.rollout_ids = rollout_ids;
307        self
308    }
309
310    /// The rollout identifiers bound to this engine (read counterpart to
311    /// [`with_rollout_ids`](Self::with_rollout_ids)). Empty by default for the
312    /// legacy tenant-only path.
313    pub fn rollout_ids(&self) -> &RolloutIds {
314        &self.rollout_ids
315    }
316
317    /// Set an optional cross-pack resolver for `provider.invoke` nodes that
318    /// reference providers in other packs (resolved via capability registry).
319    pub fn set_cross_pack_resolver(&mut self, resolver: Arc<dyn CrossPackResolver>) {
320        self.cross_pack_resolver = Some(resolver);
321    }
322
323    async fn get_or_load_flow(&self, pack_id: &str, flow_id: &str) -> Result<HostFlow> {
324        let key = FlowKey {
325            pack_id: pack_id.to_string(),
326            flow_id: flow_id.to_string(),
327        };
328        if let Some(flow) = self.flow_cache.read().get(&key).cloned() {
329            return Ok(flow);
330        }
331
332        let pack_idx = *self
333            .flow_sources
334            .get(&key)
335            .with_context(|| format!("flow {pack_id}:{flow_id} not registered"))?;
336        let pack = Arc::clone(&self.packs[pack_idx]);
337        let flow_id_owned = flow_id.to_string();
338        let task_flow_id = flow_id_owned.clone();
339        let flow = task::spawn_blocking(move || pack.load_flow(&task_flow_id))
340            .await
341            .context("failed to join flow metadata task")??;
342        let host_flow = HostFlow::from(flow);
343        self.flow_cache.write().insert(
344            FlowKey {
345                pack_id: pack_id.to_string(),
346                flow_id: flow_id_owned.clone(),
347            },
348            host_flow.clone(),
349        );
350        Ok(host_flow)
351    }
352
353    /// Create the `flow.execute` span and install per-invocation telemetry:
354    /// declared span fields, the task-local tenant context, and the **exported**
355    /// `gt.*` attribution — the live `pack_id` plus any rollout identifiers from
356    /// the owning revision runtime (C5.4). Returned for the caller to
357    /// `.instrument()`. Both `execute` and `resume` route through here so every
358    /// per-invocation entry point carries the same attribution.
359    fn flow_execute_span(&self, ctx: &FlowContext<'_>) -> tracing::Span {
360        let span = tracing::info_span!(
361            "flow.execute",
362            tenant = tracing::field::Empty,
363            flow_id = tracing::field::Empty,
364            node_id = tracing::field::Empty,
365            tool = tracing::field::Empty,
366            action = tracing::field::Empty
367        );
368        annotate_span(
369            &span,
370            &FlowSpanAttributes {
371                tenant: ctx.tenant,
372                flow_id: ctx.flow_id,
373                node_id: ctx.node_id,
374                tool: ctx.tool,
375                action: ctx.action,
376            },
377        );
378        set_flow_context(
379            &span,
380            &self.default_env,
381            ctx.tenant,
382            ctx.flow_id,
383            ctx.node_id,
384            ctx.provider_id,
385            ctx.session_id,
386            ctx.pack_id,
387            &self.rollout_ids,
388        );
389        span
390    }
391
392    pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
393        let span = self.flow_execute_span(&ctx);
394        let retry_config = ctx.retry_config;
395        let original_input = input;
396        let mut ctx = ctx;
397        let metric_tenant = ctx.tenant.to_string();
398        let metric_flow_id = ctx.flow_id.to_string();
399        let started = std::time::Instant::now();
400        let result = async move {
401            let mut attempt = 0u32;
402            loop {
403                attempt += 1;
404                ctx.attempt = attempt;
405                #[cfg(feature = "fault-injection")]
406                {
407                    let fault_ctx = FaultContext {
408                        pack_id: ctx.pack_id,
409                        flow_id: ctx.flow_id,
410                        node_id: ctx.node_id,
411                        attempt: ctx.attempt,
412                    };
413                    maybe_fail(FaultPoint::Timeout, fault_ctx)
414                        .map_err(|err| anyhow!(err.to_string()))?;
415                }
416                match self.execute_once(&ctx, original_input.clone()).await {
417                    Ok(value) => return Ok(value),
418                    Err(err) => {
419                        if attempt >= retry_config.max_attempts || !should_retry(&err) {
420                            // User-facing session flows surface the terminal
421                            // error as a metadata-only Ok envelope so the
422                            // messaging provider renders it instead of leaking
423                            // raw engine text to the chat.
424                            if ctx.session_id.is_some() {
425                                return Ok(FlowExecution::completed(json!({
426                                    "metadata": {
427                                        "error_kind": "flow_execution_failed",
428                                        "error_message": err.to_string(),
429                                        "flow_id": ctx.flow_id,
430                                    }
431                                })));
432                            }
433                            return Err(err);
434                        }
435                        let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
436                        tracing::warn!(
437                            tenant = ctx.tenant,
438                            flow_id = ctx.flow_id,
439                            attempt,
440                            max_attempts = retry_config.max_attempts,
441                            delay_ms = delay,
442                            error = %err,
443                            "transient flow execution failure, backing off"
444                        );
445                        tokio::time::sleep(Duration::from_millis(delay)).await;
446                    }
447                }
448            }
449        }
450        .instrument(span)
451        .await;
452        let status = if result.is_ok() { "ok" } else { "err" };
453        let duration_ms = started.elapsed().as_secs_f64() * 1000.0;
454        crate::metrics::record_flow_execution(&metric_tenant, &metric_flow_id, status, duration_ms);
455        result
456    }
457
458    pub async fn resume(
459        &self,
460        ctx: FlowContext<'_>,
461        snapshot: FlowSnapshot,
462        input: Value,
463    ) -> Result<FlowExecution> {
464        if snapshot.pack_id != ctx.pack_id {
465            bail!(
466                "snapshot pack {} does not match requested {}",
467                snapshot.pack_id,
468                ctx.pack_id
469            );
470        }
471        let resume_flow = snapshot
472            .next_flow
473            .clone()
474            .unwrap_or_else(|| snapshot.flow_id.clone());
475        let flow_ir = self.get_or_load_flow(ctx.pack_id, &resume_flow).await?;
476        let mut state = snapshot.state;
477        // Replace BOTH `input` AND `entry` with the new activity. The
478        // routing context (built by `build_routing_context`) reads
479        // `entry.input.metadata.*` for the synthesised `response.*` fields
480        // that conditional routes test against — keeping the snapshot's
481        // stale entry would make `response.action` perpetually empty and
482        // every condition fail, looping the user back to the wait point
483        // forever. `replace_input` only touches `state.input`, so we have
484        // to refresh `entry` ourselves; `ensure_entry` is a no-op once
485        // entry is non-null.
486        state.replace_input(input.clone());
487        state.entry = input;
488        let span = self.flow_execute_span(&ctx);
489        self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node), resume_flow)
490            .instrument(span)
491            .await
492    }
493
494    async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
495        let flow_ir = self.get_or_load_flow(ctx.pack_id, ctx.flow_id).await?;
496        let state = ExecutionState::new(input);
497        self.drive_flow(ctx, flow_ir, state, None, ctx.flow_id.to_string())
498            .await
499    }
500
501    async fn drive_flow(
502        &self,
503        ctx: &FlowContext<'_>,
504        mut flow_ir: HostFlow,
505        mut state: ExecutionState,
506        resume_from: Option<String>,
507        mut current_flow_id: String,
508    ) -> Result<FlowExecution> {
509        let mut current = match resume_from {
510            Some(node) => NodeId::from_str(&node)
511                .with_context(|| format!("invalid resume node id `{node}`"))?,
512            None => flow_ir
513                .start
514                .clone()
515                .or_else(|| flow_ir.nodes.keys().next().cloned())
516                .with_context(|| format!("flow {} has no start node", flow_ir.id))?,
517        };
518
519        loop {
520            let step_ctx = FlowContext {
521                tenant: ctx.tenant,
522                pack_id: ctx.pack_id,
523                flow_id: current_flow_id.as_str(),
524                node_id: ctx.node_id,
525                tool: ctx.tool,
526                action: ctx.action,
527                session_id: ctx.session_id,
528                provider_id: ctx.provider_id,
529                retry_config: ctx.retry_config,
530                attempt: ctx.attempt,
531                observer: ctx.observer,
532                mocks: ctx.mocks,
533            };
534            let node = flow_ir
535                .nodes
536                .get(&current)
537                .with_context(|| format!("node {} not found", current.as_str()))?;
538
539            let payload_template = node.payload_expr.clone();
540            let prev = state
541                .last_output
542                .as_ref()
543                .cloned()
544                .unwrap_or_else(|| Value::Object(JsonMap::new()));
545            let ctx_value = template_context(&state, prev);
546            #[cfg(feature = "fault-injection")]
547            {
548                let fault_ctx = FaultContext {
549                    pack_id: ctx.pack_id,
550                    flow_id: ctx.flow_id,
551                    node_id: Some(current.as_str()),
552                    attempt: ctx.attempt,
553                };
554                maybe_fail(FaultPoint::TemplateRender, fault_ctx)
555                    .map_err(|err| anyhow!(err.to_string()))?;
556            }
557            let mut payload =
558                render_template_value(&payload_template, &ctx_value, TemplateOptions::default())
559                    .context("failed to render node input template")?;
560            let node_id = current.clone();
561
562            // Phase D: inject flow-level slot_schema as slot_definitions into
563            // the slot-extractor's input when the author omitted inline
564            // definitions. Explicit inline `slot_definitions` win
565            // (back-compat with M2.4 NDA demo).
566            if let NodeKind::Exec { target_component } = &node.kind
567                && target_component == SLOT_EXTRACTOR_COMPONENT_ID
568                && let Some(schema) = flow_ir.slot_schema.as_ref()
569                && let Some(map) = payload.as_object_mut()
570            {
571                let input = map.entry("input").or_insert(Value::Null);
572                inject_slot_definitions(input, schema, step_ctx.flow_id, node_id.as_str());
573            }
574
575            let observed_payload = payload.clone();
576            let event = NodeEvent {
577                context: &step_ctx,
578                node_id: node_id.as_str(),
579                node,
580                payload: &observed_payload,
581            };
582            if let Some(observer) = step_ctx.observer {
583                observer.on_node_start(&event);
584            }
585            let dispatch = self
586                .dispatch_node(
587                    &step_ctx,
588                    node_id.as_str(),
589                    node,
590                    &mut state,
591                    payload,
592                    &event,
593                )
594                .await;
595            let DispatchOutcome { output, control } = match dispatch {
596                Ok(outcome) => outcome,
597                Err(err) => {
598                    if let Some(observer) = step_ctx.observer {
599                        observer.on_node_error(&event, err.as_ref());
600                    }
601                    // Propagate so `execute()`'s retry loop can retry transient
602                    // failures, then convert to a metadata-only Ok envelope at
603                    // the top level once retries are exhausted (session flows).
604                    return Err(err);
605                }
606            };
607
608            state.nodes.insert(node_id.clone().into(), output.clone());
609            state.last_output = Some(output.payload.clone());
610            if let Some(observer) = step_ctx.observer {
611                observer.on_node_end(&event, &output.payload);
612            }
613
614            match control {
615                NodeControl::Continue => {
616                    enum NextDecision {
617                        Next(NodeId),
618                        End,
619                        Wait,
620                    }
621                    let decision = match &node.routing {
622                        Routing::Next { node_id } => NextDecision::Next(node_id.clone()),
623                        Routing::End | Routing::Reply => NextDecision::End,
624                        Routing::Branch { default, .. } => match default {
625                            Some(target) => NextDecision::Next(target.clone()),
626                            None => NextDecision::End,
627                        },
628                        Routing::Custom(raw) => {
629                            match evaluate_custom_routing(raw, &output, &state, &flow_ir, &node_id)
630                            {
631                                CustomRoutingDecision::Next(nid) => NextDecision::Next(nid),
632                                CustomRoutingDecision::End => NextDecision::End,
633                                CustomRoutingDecision::Wait => NextDecision::Wait,
634                            }
635                        }
636                    };
637
638                    match decision {
639                        NextDecision::Next(n) => current = n,
640                        NextDecision::End => {
641                            let nodes_snapshot = state.nodes.clone();
642                            let final_output = state.finalize_with(Some(output.payload.clone()));
643                            return Ok(FlowExecution::completed(lift_first_node_error_from_nodes(
644                                final_output,
645                                &nodes_snapshot,
646                            )));
647                        }
648                        NextDecision::Wait => {
649                            // Conditional routing fell through. Pause at the
650                            // current node so the next inbound activity
651                            // resumes here and re-evaluates this node's
652                            // routing with the user's new submit payload.
653                            let mut snapshot_state = state.clone();
654                            snapshot_state.clear_egress();
655                            let snapshot = FlowSnapshot {
656                                pack_id: step_ctx.pack_id.to_string(),
657                                flow_id: step_ctx.flow_id.to_string(),
658                                next_flow: (current_flow_id != step_ctx.flow_id)
659                                    .then_some(current_flow_id.clone()),
660                                next_node: node_id.as_str().to_string(),
661                                state: snapshot_state,
662                            };
663                            let output_value = state.finalize_with(Some(output.payload.clone()));
664                            return Ok(FlowExecution::waiting(
665                                output_value,
666                                FlowWait {
667                                    reason: Some(format!(
668                                        "awaiting user submit at node `{}`",
669                                        node_id.as_str()
670                                    )),
671                                    snapshot,
672                                },
673                            ));
674                        }
675                    }
676                }
677                NodeControl::Wait { reason } => {
678                    let next: Option<NodeId> = match &node.routing {
679                        Routing::Next { node_id } => Some(node_id.clone()),
680                        Routing::End | Routing::Reply => None,
681                        Routing::Branch { default, .. } => default.clone(),
682                        Routing::Custom(raw) => {
683                            match evaluate_custom_routing(raw, &output, &state, &flow_ir, &node_id)
684                            {
685                                CustomRoutingDecision::Next(nid) => Some(nid),
686                                // session.wait operator must have an
687                                // explicit forward target — both End and
688                                // Wait decisions collapse to "no next" and
689                                // surface the same error below.
690                                CustomRoutingDecision::End | CustomRoutingDecision::Wait => None,
691                            }
692                        }
693                    };
694                    let resume_target = next.ok_or_else(|| {
695                        anyhow!(
696                            "session.wait node {} requires a non-empty route",
697                            current.as_str()
698                        )
699                    })?;
700                    let mut snapshot_state = state.clone();
701                    snapshot_state.clear_egress();
702                    let snapshot = FlowSnapshot {
703                        pack_id: step_ctx.pack_id.to_string(),
704                        flow_id: step_ctx.flow_id.to_string(),
705                        next_flow: (current_flow_id != step_ctx.flow_id)
706                            .then_some(current_flow_id.clone()),
707                        next_node: resume_target.as_str().to_string(),
708                        state: snapshot_state,
709                    };
710                    let output_value = state.clone().finalize_with(None);
711                    return Ok(FlowExecution::waiting(
712                        output_value,
713                        FlowWait { reason, snapshot },
714                    ));
715                }
716                NodeControl::Jump(jump) => {
717                    let jump_target = self.apply_jump(&step_ctx, &mut state, jump).await?;
718                    flow_ir = jump_target.flow;
719                    current_flow_id = jump_target.flow_id;
720                    current = jump_target.node_id;
721                }
722                NodeControl::Respond {
723                    text,
724                    card_cbor,
725                    needs_user,
726                } => {
727                    let response = json!({
728                        "text": text,
729                        "card_cbor": card_cbor,
730                        "needs_user": needs_user,
731                    });
732                    state.push_egress(response);
733                    let nodes_snapshot = state.nodes.clone();
734                    let final_output = state.finalize_with(None);
735                    return Ok(FlowExecution::completed(lift_first_node_error_from_nodes(
736                        final_output,
737                        &nodes_snapshot,
738                    )));
739                }
740            }
741        }
742    }
743
744    async fn dispatch_node(
745        &self,
746        ctx: &FlowContext<'_>,
747        node_id: &str,
748        node: &HostNode,
749        state: &mut ExecutionState,
750        mut payload: Value,
751        event: &NodeEvent<'_>,
752    ) -> Result<DispatchOutcome> {
753        inject_card_locale(&mut payload, &state.entry);
754        match &node.kind {
755            NodeKind::Exec { target_component } => self
756                .execute_component_exec(
757                    ctx,
758                    node_id,
759                    node,
760                    payload,
761                    event,
762                    ComponentOverrides {
763                        component: Some(target_component.as_str()),
764                        operation: node.operation_name.as_deref(),
765                    },
766                )
767                .await
768                .and_then(component_dispatch_outcome),
769            NodeKind::PackComponent { component_ref } => self
770                .execute_component_call(ctx, node_id, node, payload, component_ref.as_str(), event)
771                .await
772                .and_then(component_dispatch_outcome),
773            NodeKind::FlowCall => self
774                .execute_flow_call(ctx, payload)
775                .await
776                .map(DispatchOutcome::complete),
777            NodeKind::ProviderInvoke => self
778                .execute_provider_invoke(ctx, node_id, state, payload, event)
779                .await
780                .map(DispatchOutcome::complete),
781            NodeKind::BuiltinEmit { kind } => {
782                match kind {
783                    EmitKind::Log | EmitKind::Response => {}
784                    EmitKind::Other(component) => {
785                        tracing::debug!(%component, "handling emit.* as builtin");
786                    }
787                }
788                state.push_egress(payload.clone());
789                Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
790            }
791            NodeKind::BuiltinStateGet => self
792                .execute_state_get(ctx, payload)
793                .await
794                .map(DispatchOutcome::complete),
795            NodeKind::BuiltinStateSet => self
796                .execute_state_set(ctx, payload)
797                .await
798                .map(DispatchOutcome::complete),
799            NodeKind::Wait => {
800                let reason = extract_wait_reason(&payload);
801                Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
802            }
803        }
804    }
805
806    async fn execute_state_get(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
807        let key = Self::extract_state_key_helper(&payload)?;
808        let pack = self.pack_for_flow(ctx)?;
809        let store = pack
810            .state_store_handle()
811            .context("state store is not configured for this runtime")?;
812        let tenant_ctx = self.state_tenant_ctx(ctx)?;
813        let state_key = greentic_state::StateKey::new(&key);
814        let value = store
815            .get_json(
816                &tenant_ctx,
817                crate::storage::state::STATE_PREFIX,
818                &state_key,
819                None,
820            )
821            .with_context(|| format!("state.get failed for key `{key}`"))?;
822        let payload = serde_json::json!({
823            "key": key,
824            "value": value,
825            "found": value.is_some(),
826        });
827        Ok(NodeOutput::new(payload))
828    }
829
830    async fn execute_state_set(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
831        let key = Self::extract_state_key_helper(&payload)?;
832        let value = payload.get("value").cloned().unwrap_or(Value::Null);
833        let pack = self.pack_for_flow(ctx)?;
834        let store = pack
835            .state_store_handle()
836            .context("state store is not configured for this runtime")?;
837        let tenant_ctx = self.state_tenant_ctx(ctx)?;
838        let state_key = greentic_state::StateKey::new(&key);
839        store
840            .set_json(
841                &tenant_ctx,
842                crate::storage::state::STATE_PREFIX,
843                &state_key,
844                None,
845                &value,
846                None,
847            )
848            .with_context(|| format!("state.set failed for key `{key}`"))?;
849        let payload = serde_json::json!({ "key": key, "value": value });
850        Ok(NodeOutput::new(payload))
851    }
852
853    fn pack_for_flow(&self, ctx: &FlowContext<'_>) -> Result<&Arc<PackRuntime>> {
854        let key = FlowKey {
855            pack_id: ctx.pack_id.to_string(),
856            flow_id: ctx.flow_id.to_string(),
857        };
858        let idx = self.flow_sources.get(&key).with_context(|| {
859            format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
860        })?;
861        Ok(&self.packs[*idx])
862    }
863
864    fn extract_state_key_helper(payload: &Value) -> Result<String> {
865        payload
866            .get("key")
867            .and_then(Value::as_str)
868            .map(String::from)
869            .filter(|k| !k.is_empty())
870            .context("state node payload missing required `key` (non-empty string)")
871    }
872
873    fn state_tenant_ctx(&self, ctx: &FlowContext<'_>) -> Result<greentic_types::TenantCtx> {
874        let env = greentic_types::EnvId::from_str(&self.default_env)
875            .with_context(|| format!("invalid env id `{}`", self.default_env))?;
876        let tenant = greentic_types::TenantId::from_str(ctx.tenant)
877            .with_context(|| format!("invalid tenant id `{}`", ctx.tenant))?;
878        Ok(greentic_types::TenantCtx::new(env, tenant))
879    }
880
881    async fn apply_jump(
882        &self,
883        ctx: &FlowContext<'_>,
884        state: &mut ExecutionState,
885        jump: JumpControl,
886    ) -> Result<JumpTarget> {
887        let target_flow = jump.flow.trim();
888        if target_flow.is_empty() {
889            bail!("missing_flow");
890        }
891
892        let flow = self
893            .get_or_load_flow(ctx.pack_id, target_flow)
894            .await
895            .with_context(|| format!("unknown_flow:{target_flow}"))?;
896
897        let target_node = if let Some(node) = jump.node.as_deref() {
898            let parsed = NodeId::from_str(node).with_context(|| format!("unknown_node:{node}"))?;
899            if !flow.nodes.contains_key(&parsed) {
900                bail!("unknown_node:{node}");
901            }
902            parsed
903        } else {
904            flow.start
905                .clone()
906                .or_else(|| flow.nodes.keys().next().cloned())
907                .ok_or_else(|| anyhow!("jump_failed: flow {target_flow} has no start node"))?
908        };
909
910        let max_redirects = jump.max_redirects.unwrap_or(3);
911        if state.redirect_count() >= max_redirects {
912            bail!("redirect_limit");
913        }
914        state.increment_redirect_count();
915        state.replace_input(jump.payload.clone());
916        state.last_output = Some(jump.payload);
917        tracing::info!(
918            flow_id = %ctx.flow_id,
919            target_flow = %target_flow,
920            target_node = %target_node.as_str(),
921            reason = ?jump.reason,
922            redirects = state.redirect_count(),
923            "flow.jump.applied"
924        );
925
926        Ok(JumpTarget {
927            flow_id: target_flow.to_string(),
928            flow,
929            node_id: target_node,
930        })
931    }
932
933    async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
934        #[derive(Deserialize)]
935        struct FlowCallPayload {
936            #[serde(alias = "flow")]
937            flow_id: String,
938            #[serde(default)]
939            input: Value,
940        }
941
942        let call: FlowCallPayload =
943            serde_json::from_value(payload).context("invalid payload for flow.call node")?;
944        if call.flow_id.trim().is_empty() {
945            bail!("flow.call requires a non-empty flow_id");
946        }
947
948        let sub_input = if call.input.is_null() {
949            Value::Null
950        } else {
951            call.input
952        };
953
954        let flow_id_owned = call.flow_id;
955        let action = "flow.call";
956        let sub_ctx = FlowContext {
957            tenant: ctx.tenant,
958            pack_id: ctx.pack_id,
959            flow_id: flow_id_owned.as_str(),
960            node_id: None,
961            tool: ctx.tool,
962            action: Some(action),
963            session_id: ctx.session_id,
964            provider_id: ctx.provider_id,
965            retry_config: ctx.retry_config,
966            attempt: ctx.attempt,
967            observer: ctx.observer,
968            mocks: ctx.mocks,
969        };
970
971        let execution = Box::pin(self.execute(sub_ctx, sub_input))
972            .await
973            .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
974        match execution.status {
975            FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
976            FlowStatus::Waiting(wait) => bail!(
977                "flow.call cannot pause (flow {} waiting {:?})",
978                flow_id_owned,
979                wait.reason
980            ),
981        }
982    }
983
984    async fn execute_component_exec(
985        &self,
986        ctx: &FlowContext<'_>,
987        node_id: &str,
988        node: &HostNode,
989        payload: Value,
990        event: &NodeEvent<'_>,
991        overrides: ComponentOverrides<'_>,
992    ) -> Result<NodeOutput> {
993        #[derive(Deserialize)]
994        struct ComponentPayload {
995            #[serde(default, alias = "component_ref", alias = "component")]
996            component: Option<String>,
997            #[serde(alias = "op")]
998            operation: Option<String>,
999            #[serde(default)]
1000            input: Value,
1001            #[serde(default)]
1002            config: Value,
1003        }
1004
1005        let payload: ComponentPayload =
1006            serde_json::from_value(payload).context("invalid payload for component.exec")?;
1007        let component_ref = overrides
1008            .component
1009            .map(str::to_string)
1010            .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
1011            .with_context(|| "component.exec requires a component_ref")?;
1012        let operation = resolve_component_operation(
1013            node_id,
1014            node.component_id.as_str(),
1015            payload.operation,
1016            overrides.operation,
1017            node.operation_in_mapping.as_deref(),
1018        )?;
1019
1020        let call = ComponentCall {
1021            component_ref,
1022            operation,
1023            input: payload.input,
1024            config: payload.config,
1025        };
1026
1027        self.invoke_component_call(ctx, node_id, call, event).await
1028    }
1029
1030    async fn execute_component_call(
1031        &self,
1032        ctx: &FlowContext<'_>,
1033        node_id: &str,
1034        node: &HostNode,
1035        payload: Value,
1036        component_ref: &str,
1037        event: &NodeEvent<'_>,
1038    ) -> Result<NodeOutput> {
1039        let payload_operation = extract_operation_from_mapping(&payload);
1040        let (input, config) = split_operation_payload(payload);
1041        let operation = resolve_component_operation(
1042            node_id,
1043            node.component_id.as_str(),
1044            payload_operation,
1045            node.operation_name.as_deref(),
1046            node.operation_in_mapping.as_deref(),
1047        )?;
1048        let call = ComponentCall {
1049            component_ref: component_ref.to_string(),
1050            operation,
1051            input,
1052            config,
1053        };
1054        self.invoke_component_call(ctx, node_id, call, event).await
1055    }
1056
1057    async fn invoke_component_call(
1058        &self,
1059        ctx: &FlowContext<'_>,
1060        node_id: &str,
1061        mut call: ComponentCall,
1062        event: &NodeEvent<'_>,
1063    ) -> Result<NodeOutput> {
1064        self.validate_component(ctx, event, &call)?;
1065        let key = FlowKey {
1066            pack_id: ctx.pack_id.to_string(),
1067            flow_id: ctx.flow_id.to_string(),
1068        };
1069        let pack_idx = *self.flow_sources.get(&key).with_context(|| {
1070            format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
1071        })?;
1072        let pack = Arc::clone(&self.packs[pack_idx]);
1073
1074        // Promote adaptive-card defaults from node config (default_card_asset /
1075        // default_card_inline / default_source) into the invocation, so the
1076        // component receives a valid `card_spec` field even when the user input
1077        // is empty (e.g. webchat ConversationStart with no text). Without this,
1078        // schema validation in the component reports AC_INVOCATION_MISSING_FIELD
1079        // and the renderer falls back to a generic "Welcome" placeholder.
1080        promote_card_config_to_invocation(&mut call.input, &call.config);
1081
1082        // Pre-resolve card asset paths: read JSON files from the pack's assets
1083        // directory and inject as inline_json so the component doesn't need
1084        // WASI filesystem access.
1085        resolve_card_assets(&mut call.input, &pack);
1086
1087        // When the input is a card-like invocation (has card_source/card_spec),
1088        // pass it directly to the component instead of wrapping in an
1089        // InvocationEnvelope.  The envelope serialises the payload field as a
1090        // byte array which the component cannot decode back, and the
1091        // InvocationPayload::parse heuristic strips domain fields when a
1092        // `payload` key is present (e.g.  the card's Handlebars template
1093        // context `payload: {}`).
1094        let is_card = is_card_invocation(&call.input);
1095
1096        let input_json = if is_card {
1097            serde_json::to_string(&call.input)?
1098        } else {
1099            // Runtime owns ctx; flows must not embed ctx, even if they provide envelopes.
1100            let meta = InvocationMeta {
1101                env: &self.default_env,
1102                tenant: ctx.tenant,
1103                flow_id: ctx.flow_id,
1104                node_id: Some(node_id),
1105                provider_id: ctx.provider_id,
1106                session_id: ctx.session_id,
1107                attempt: ctx.attempt,
1108            };
1109            let invocation_envelope =
1110                build_invocation_envelope(meta, call.operation.as_str(), call.input)
1111                    .context("build invocation envelope for component call")?;
1112            serde_json::to_string(&invocation_envelope)?
1113        };
1114        let config_json = if call.config.is_null() {
1115            None
1116        } else {
1117            Some(serde_json::to_string(&call.config)?)
1118        };
1119
1120        let exec_ctx = component_exec_ctx(ctx, node_id);
1121        #[cfg(feature = "fault-injection")]
1122        {
1123            let fault_ctx = FaultContext {
1124                pack_id: ctx.pack_id,
1125                flow_id: ctx.flow_id,
1126                node_id: Some(node_id),
1127                attempt: ctx.attempt,
1128            };
1129            maybe_fail(FaultPoint::BeforeComponentCall, fault_ctx)
1130                .map_err(|err| anyhow!(err.to_string()))?;
1131        }
1132        let value = pack
1133            .invoke_component(
1134                call.component_ref.as_str(),
1135                exec_ctx,
1136                call.operation.as_str(),
1137                config_json,
1138                input_json,
1139            )
1140            .await?;
1141        #[cfg(feature = "fault-injection")]
1142        {
1143            let fault_ctx = FaultContext {
1144                pack_id: ctx.pack_id,
1145                flow_id: ctx.flow_id,
1146                node_id: Some(node_id),
1147                attempt: ctx.attempt,
1148            };
1149            maybe_fail(FaultPoint::AfterComponentCall, fault_ctx)
1150                .map_err(|err| anyhow!(err.to_string()))?;
1151        }
1152
1153        if let Some((code, message)) = component_error(&value) {
1154            bail!(
1155                "component {} failed: {}: {}",
1156                call.component_ref,
1157                code,
1158                message
1159            );
1160        }
1161        // MCP-shaped tool errors (greentic-mcp-generator's tool_error_with_status)
1162        // come back as a top-level `{ "error": { "code", "message", "status" } }`
1163        // value with the WIT envelope still ok=true (because the wasm guest
1164        // returned normally). Treat them the same as a component_error so the
1165        // engine error-envelope lift path surfaces the failure to the user.
1166        if let Some((code, message)) = mcp_tool_error(&value) {
1167            bail!(
1168                "component {} returned tool error: {}: {}",
1169                call.component_ref,
1170                code,
1171                message
1172            );
1173        }
1174        Ok(NodeOutput::new(value))
1175    }
1176
1177    async fn execute_provider_invoke(
1178        &self,
1179        ctx: &FlowContext<'_>,
1180        node_id: &str,
1181        state: &ExecutionState,
1182        payload: Value,
1183        event: &NodeEvent<'_>,
1184    ) -> Result<NodeOutput> {
1185        #[derive(Deserialize)]
1186        struct ProviderPayload {
1187            #[serde(default)]
1188            provider_id: Option<String>,
1189            #[serde(default)]
1190            provider_type: Option<String>,
1191            #[serde(default, alias = "operation")]
1192            op: Option<String>,
1193            #[serde(default)]
1194            input: Value,
1195            #[serde(default)]
1196            in_map: Value,
1197            #[serde(default)]
1198            out_map: Value,
1199            #[serde(default)]
1200            err_map: Value,
1201        }
1202
1203        let payload: ProviderPayload =
1204            serde_json::from_value(payload).context("invalid payload for provider.invoke")?;
1205        let op = payload
1206            .op
1207            .as_deref()
1208            .filter(|v| !v.trim().is_empty())
1209            .with_context(|| "provider.invoke requires an op")?
1210            .to_string();
1211
1212        let prev = state
1213            .last_output
1214            .as_ref()
1215            .cloned()
1216            .unwrap_or_else(|| Value::Object(JsonMap::new()));
1217        let base_ctx = template_context(state, prev);
1218
1219        let input_value = if !payload.in_map.is_null() {
1220            let mut ctx_value = base_ctx.clone();
1221            if let Value::Object(ref mut map) = ctx_value {
1222                map.insert("input".into(), payload.input.clone());
1223                map.insert("result".into(), payload.input.clone());
1224            }
1225            render_template_value(
1226                &payload.in_map,
1227                &ctx_value,
1228                TemplateOptions {
1229                    allow_pointer: true,
1230                },
1231            )
1232            .context("failed to render provider.invoke in_map")?
1233        } else if !payload.input.is_null() {
1234            payload.input
1235        } else {
1236            Value::Null
1237        };
1238        let input_json = serde_json::to_vec(&input_value)?;
1239
1240        self.validate_tool(
1241            ctx,
1242            event,
1243            payload.provider_id.as_deref(),
1244            payload.provider_type.as_deref(),
1245            &op,
1246            &input_value,
1247        )?;
1248
1249        let key = FlowKey {
1250            pack_id: ctx.pack_id.to_string(),
1251            flow_id: ctx.flow_id.to_string(),
1252        };
1253        let pack_idx = *self.flow_sources.get(&key).with_context(|| {
1254            format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
1255        })?;
1256        let pack = Arc::clone(&self.packs[pack_idx]);
1257        let binding = pack.resolve_provider(
1258            payload.provider_id.as_deref(),
1259            payload.provider_type.as_deref(),
1260        );
1261
1262        // If pack-local resolution fails, try the cross-pack resolver (capability registry).
1263        if binding.is_err()
1264            && let Some(output) = self.try_invoke_cross_pack_resolver(
1265                payload.provider_id.as_deref(),
1266                payload.provider_type.as_deref(),
1267                &op,
1268                &input_json,
1269                ctx.tenant,
1270            )?
1271        {
1272            return Ok(output);
1273        }
1274
1275        let binding = binding?;
1276        let exec_ctx = component_exec_ctx(ctx, node_id);
1277        #[cfg(feature = "fault-injection")]
1278        {
1279            let fault_ctx = FaultContext {
1280                pack_id: ctx.pack_id,
1281                flow_id: ctx.flow_id,
1282                node_id: Some(node_id),
1283                attempt: ctx.attempt,
1284            };
1285            maybe_fail(FaultPoint::BeforeToolCall, fault_ctx)
1286                .map_err(|err| anyhow!(err.to_string()))?;
1287        }
1288        let provider_metric_id = payload
1289            .provider_id
1290            .as_deref()
1291            .or(payload.provider_type.as_deref())
1292            .unwrap_or("unknown");
1293        let invoke_started = std::time::Instant::now();
1294        let invoke_result = pack
1295            .invoke_provider(&binding, exec_ctx, &op, input_json)
1296            .await;
1297        let invoke_duration_ms = invoke_started.elapsed().as_secs_f64() * 1000.0;
1298        crate::metrics::record_provider_invocation(
1299            ctx.tenant,
1300            provider_metric_id,
1301            &op,
1302            if invoke_result.is_ok() { "ok" } else { "err" },
1303            invoke_duration_ms,
1304        );
1305        let result = invoke_result?;
1306        #[cfg(feature = "fault-injection")]
1307        {
1308            let fault_ctx = FaultContext {
1309                pack_id: ctx.pack_id,
1310                flow_id: ctx.flow_id,
1311                node_id: Some(node_id),
1312                attempt: ctx.attempt,
1313            };
1314            maybe_fail(FaultPoint::AfterToolCall, fault_ctx)
1315                .map_err(|err| anyhow!(err.to_string()))?;
1316        }
1317
1318        let output = if payload.out_map.is_null() {
1319            result
1320        } else {
1321            let mut ctx_value = base_ctx;
1322            if let Value::Object(ref mut map) = ctx_value {
1323                map.insert("input".into(), result.clone());
1324                map.insert("result".into(), result.clone());
1325            }
1326            render_template_value(
1327                &payload.out_map,
1328                &ctx_value,
1329                TemplateOptions {
1330                    allow_pointer: true,
1331                },
1332            )
1333            .context("failed to render provider.invoke out_map")?
1334        };
1335        let _ = payload.err_map;
1336        Ok(NodeOutput::new(output))
1337    }
1338
1339    fn try_invoke_cross_pack_resolver(
1340        &self,
1341        provider_id: Option<&str>,
1342        provider_type: Option<&str>,
1343        op: &str,
1344        input_json: &[u8],
1345        tenant: &str,
1346    ) -> Result<Option<NodeOutput>> {
1347        eprintln!(
1348            "[DEBUG] provider.invoke: pack-local failed, has_resolver={}",
1349            self.cross_pack_resolver.is_some()
1350        );
1351        let Some(resolver) = self.cross_pack_resolver.as_ref() else {
1352            return Ok(None);
1353        };
1354        let provider_id = provider_id.unwrap_or("unknown");
1355        tracing::info!(
1356            provider_id,
1357            op = %op,
1358            "provider.invoke: pack-local resolution failed, trying cross-pack resolver"
1359        );
1360        let result_value =
1361            resolver.invoke(provider_id, provider_type, op, input_json, tenant, None)?;
1362        Ok(Some(NodeOutput::new(result_value)))
1363    }
1364
1365    fn validate_component(
1366        &self,
1367        ctx: &FlowContext<'_>,
1368        event: &NodeEvent<'_>,
1369        call: &ComponentCall,
1370    ) -> Result<()> {
1371        if self.validation.mode == ValidationMode::Off {
1372            return Ok(());
1373        }
1374        let mut metadata = JsonMap::new();
1375        metadata.insert("tenant_id".to_string(), json!(ctx.tenant));
1376        if let Some(id) = ctx.session_id {
1377            metadata.insert("session".to_string(), json!({ "id": id }));
1378        }
1379        let envelope = json!({
1380            "component_id": call.component_ref,
1381            "operation": call.operation,
1382            "input": call.input,
1383            "config": call.config,
1384            "metadata": Value::Object(metadata),
1385        });
1386        let issues = validate_component_envelope(&envelope);
1387        self.report_validation(ctx, event, "component", issues)
1388    }
1389
1390    fn validate_tool(
1391        &self,
1392        ctx: &FlowContext<'_>,
1393        event: &NodeEvent<'_>,
1394        provider_id: Option<&str>,
1395        provider_type: Option<&str>,
1396        operation: &str,
1397        input: &Value,
1398    ) -> Result<()> {
1399        if self.validation.mode == ValidationMode::Off {
1400            return Ok(());
1401        }
1402        let tool_id = provider_id.or(provider_type).unwrap_or("provider.invoke");
1403        let mut metadata = JsonMap::new();
1404        metadata.insert("tenant_id".to_string(), json!(ctx.tenant));
1405        if let Some(id) = ctx.session_id {
1406            metadata.insert("session".to_string(), json!({ "id": id }));
1407        }
1408        let envelope = json!({
1409            "tool_id": tool_id,
1410            "operation": operation,
1411            "input": input,
1412            "metadata": Value::Object(metadata),
1413        });
1414        let issues = validate_tool_envelope(&envelope);
1415        self.report_validation(ctx, event, "tool", issues)
1416    }
1417
1418    fn report_validation(
1419        &self,
1420        ctx: &FlowContext<'_>,
1421        event: &NodeEvent<'_>,
1422        kind: &str,
1423        issues: Vec<ValidationIssue>,
1424    ) -> Result<()> {
1425        if issues.is_empty() {
1426            return Ok(());
1427        }
1428        if let Some(observer) = ctx.observer {
1429            observer.on_validation(event, &issues);
1430        }
1431        match self.validation.mode {
1432            ValidationMode::Warn => {
1433                tracing::warn!(
1434                    tenant = ctx.tenant,
1435                    flow_id = ctx.flow_id,
1436                    node_id = event.node_id,
1437                    kind,
1438                    issues = ?issues,
1439                    "invocation envelope validation issues"
1440                );
1441                Ok(())
1442            }
1443            ValidationMode::Error => {
1444                tracing::error!(
1445                    tenant = ctx.tenant,
1446                    flow_id = ctx.flow_id,
1447                    node_id = event.node_id,
1448                    kind,
1449                    issues = ?issues,
1450                    "invocation envelope validation failed"
1451                );
1452                bail!("invocation_validation_failed");
1453            }
1454            ValidationMode::Off => Ok(()),
1455        }
1456    }
1457
1458    pub fn flows(&self) -> &[FlowDescriptor] {
1459        &self.flows
1460    }
1461
1462    pub fn flow_by_key(&self, pack_id: &str, flow_id: &str) -> Option<&FlowDescriptor> {
1463        self.flows
1464            .iter()
1465            .find(|descriptor| descriptor.pack_id == pack_id && descriptor.id == flow_id)
1466    }
1467
1468    pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
1469        let mut matches = self
1470            .flows
1471            .iter()
1472            .filter(|descriptor| descriptor.flow_type == flow_type);
1473        let first = matches.next()?;
1474        if matches.next().is_some() {
1475            return None;
1476        }
1477        Some(first)
1478    }
1479
1480    pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
1481        let mut matches = self
1482            .flows
1483            .iter()
1484            .filter(|descriptor| descriptor.id == flow_id);
1485        let first = matches.next()?;
1486        if matches.next().is_some() {
1487            return None;
1488        }
1489        Some(first)
1490    }
1491}
1492
1493pub trait ExecutionObserver: Send + Sync {
1494    fn on_node_start(&self, event: &NodeEvent<'_>);
1495    fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
1496    fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
1497    fn on_validation(&self, _event: &NodeEvent<'_>, _issues: &[ValidationIssue]) {}
1498}
1499
1500pub struct NodeEvent<'a> {
1501    pub context: &'a FlowContext<'a>,
1502    pub node_id: &'a str,
1503    pub node: &'a HostNode,
1504    pub payload: &'a Value,
1505}
1506
1507#[derive(Clone, Debug, Serialize, Deserialize)]
1508pub struct ExecutionState {
1509    #[serde(default)]
1510    entry: Value,
1511    #[serde(default)]
1512    input: Value,
1513    #[serde(default)]
1514    nodes: HashMap<String, NodeOutput>,
1515    #[serde(default)]
1516    egress: Vec<Value>,
1517    #[serde(default, skip_serializing_if = "Option::is_none")]
1518    last_output: Option<Value>,
1519    #[serde(default)]
1520    redirect_count: u32,
1521}
1522
1523impl ExecutionState {
1524    fn new(input: Value) -> Self {
1525        Self {
1526            entry: input.clone(),
1527            input,
1528            nodes: HashMap::new(),
1529            egress: Vec::new(),
1530            last_output: None,
1531            redirect_count: 0,
1532        }
1533    }
1534
1535    /// Refresh `entry` from `input` if the snapshot was loaded without an
1536    /// entry value. Kept for backwards compatibility with snapshots
1537    /// persisted before the entry-refresh fix in `FlowEngine::resume`.
1538    #[allow(dead_code)]
1539    fn ensure_entry(&mut self) {
1540        if self.entry.is_null() {
1541            self.entry = self.input.clone();
1542        }
1543    }
1544
1545    fn context(&self) -> Value {
1546        let mut nodes = JsonMap::new();
1547        for (id, output) in &self.nodes {
1548            nodes.insert(
1549                id.clone(),
1550                json!({
1551                    "ok": output.ok,
1552                    "payload": output.payload.clone(),
1553                    "meta": output.meta.clone(),
1554                }),
1555            );
1556        }
1557        json!({
1558            "entry": self.entry.clone(),
1559            "input": self.input.clone(),
1560            "nodes": nodes,
1561            "redirect_count": self.redirect_count,
1562        })
1563    }
1564
1565    fn outputs_map(&self) -> JsonMap<String, Value> {
1566        let mut outputs = JsonMap::new();
1567        for (id, output) in &self.nodes {
1568            outputs.insert(id.clone(), output.payload.clone());
1569        }
1570        outputs
1571    }
1572    fn push_egress(&mut self, payload: Value) {
1573        self.egress.push(payload);
1574    }
1575
1576    fn replace_input(&mut self, input: Value) {
1577        self.input = input;
1578    }
1579
1580    fn clear_egress(&mut self) {
1581        self.egress.clear();
1582    }
1583
1584    fn redirect_count(&self) -> u32 {
1585        self.redirect_count
1586    }
1587
1588    fn increment_redirect_count(&mut self) {
1589        self.redirect_count = self.redirect_count.saturating_add(1);
1590    }
1591
1592    fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
1593        if self.egress.is_empty() {
1594            return final_payload.unwrap_or(Value::Null);
1595        }
1596        let mut emitted = std::mem::take(&mut self.egress);
1597        if let Some(value) = final_payload {
1598            match value {
1599                Value::Null => {}
1600                Value::Array(items) => emitted.extend(items),
1601                other => emitted.push(other),
1602            }
1603        }
1604        Value::Array(emitted)
1605    }
1606}
1607
1608#[derive(Clone, Debug, Serialize, Deserialize)]
1609struct NodeOutput {
1610    ok: bool,
1611    payload: Value,
1612    meta: Value,
1613}
1614
1615impl NodeOutput {
1616    fn new(payload: Value) -> Self {
1617        Self {
1618            ok: true,
1619            payload,
1620            meta: Value::Null,
1621        }
1622    }
1623
1624    /// `ok=false` output stashing error context in `meta.error`. Currently
1625    /// only used by `lift_first_node_error_from_nodes` tests — kept around so
1626    /// drive_flow can resume populating it once we have a hook for it.
1627    #[allow(dead_code)]
1628    fn with_error(node_id: &str, err: &(dyn std::error::Error + 'static)) -> Self {
1629        Self {
1630            ok: false,
1631            payload: Value::Null,
1632            meta: json!({
1633                "error": {
1634                    "kind": "flow_node_failed",
1635                    "message": err.to_string(),
1636                    "node_id": node_id,
1637                }
1638            }),
1639        }
1640    }
1641}
1642
1643struct DispatchOutcome {
1644    output: NodeOutput,
1645    control: NodeControl,
1646}
1647
1648impl DispatchOutcome {
1649    fn complete(output: NodeOutput) -> Self {
1650        Self {
1651            output,
1652            control: NodeControl::Continue,
1653        }
1654    }
1655
1656    fn wait(output: NodeOutput, reason: Option<String>) -> Self {
1657        Self {
1658            output,
1659            control: NodeControl::Wait { reason },
1660        }
1661    }
1662
1663    fn with_control(output: NodeOutput, control: NodeControl) -> Self {
1664        Self { output, control }
1665    }
1666}
1667
1668#[derive(Clone, Debug)]
1669enum NodeControl {
1670    Continue,
1671    Wait {
1672        reason: Option<String>,
1673    },
1674    Jump(JumpControl),
1675    Respond {
1676        text: Option<String>,
1677        card_cbor: Option<Vec<u8>>,
1678        needs_user: Option<bool>,
1679    },
1680}
1681
1682#[derive(Clone, Debug)]
1683struct JumpControl {
1684    flow: String,
1685    node: Option<String>,
1686    payload: Value,
1687    hints: Value,
1688    max_redirects: Option<u32>,
1689    reason: Option<String>,
1690}
1691
1692#[derive(Clone, Debug)]
1693struct JumpTarget {
1694    flow_id: String,
1695    flow: HostFlow,
1696    node_id: NodeId,
1697}
1698
1699impl NodeOutput {
1700    fn with_meta(payload: Value, meta: Value) -> Self {
1701        Self {
1702            ok: true,
1703            payload,
1704            meta,
1705        }
1706    }
1707}
1708
1709fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
1710    ComponentExecCtx {
1711        tenant: ComponentTenantCtx {
1712            tenant: ctx.tenant.to_string(),
1713            team: None,
1714            user: ctx.provider_id.map(str::to_string),
1715            trace_id: None,
1716            i18n_id: None,
1717            correlation_id: ctx.session_id.map(str::to_string),
1718            deadline_unix_ms: None,
1719            attempt: ctx.attempt,
1720            idempotency_key: ctx.session_id.map(str::to_string),
1721        },
1722        i18n_id: None,
1723        flow_id: ctx.flow_id.to_string(),
1724        node_id: Some(node_id.to_string()),
1725    }
1726}
1727
1728fn component_error(value: &Value) -> Option<(String, String)> {
1729    let obj = value.as_object()?;
1730    let ok = obj.get("ok").and_then(Value::as_bool)?;
1731    if ok {
1732        return None;
1733    }
1734    let err = obj.get("error")?.as_object()?;
1735    let code = err
1736        .get("code")
1737        .and_then(Value::as_str)
1738        .unwrap_or("component_error");
1739    let message = err
1740        .get("message")
1741        .and_then(Value::as_str)
1742        .unwrap_or("component reported error");
1743    Some((code.to_string(), message.to_string()))
1744}
1745
1746/// MCP tool-error wire shape from greentic-mcp-generator's `tool_error_with_status`:
1747/// `{ "error": { "code", "message", "status" } }`. The component returned ok=true at
1748/// the WIT level (the HTTP failure was caught and serialized), so the regular
1749/// component_error path doesn't catch it.
1750fn mcp_tool_error(value: &Value) -> Option<(String, String)> {
1751    let obj = value.as_object()?;
1752    // Must be the error shape: no `result` field, just `error`.
1753    if obj.contains_key("result") {
1754        return None;
1755    }
1756    let err = obj.get("error")?.as_object()?;
1757    let code = err
1758        .get("code")
1759        .and_then(Value::as_str)
1760        .unwrap_or("tool_error");
1761    let raw_message = err
1762        .get("message")
1763        .and_then(Value::as_str)
1764        .unwrap_or("tool returned an error");
1765    let status = err.get("status").and_then(Value::as_u64);
1766    let message = match status {
1767        Some(s) => format!("{raw_message} (status {s})"),
1768        None => raw_message.to_string(),
1769    };
1770    Some((code.to_string(), message))
1771}
1772
1773fn extract_wait_reason(payload: &Value) -> Option<String> {
1774    match payload {
1775        Value::String(s) => Some(s.clone()),
1776        Value::Object(map) => map
1777            .get("reason")
1778            .and_then(Value::as_str)
1779            .map(|value| value.to_string()),
1780        _ => None,
1781    }
1782}
1783
1784fn component_dispatch_outcome(output: NodeOutput) -> Result<DispatchOutcome> {
1785    if let Some(control) = parse_component_control(&output.payload)? {
1786        return Ok(match control {
1787            NodeControl::Jump(jump) => {
1788                let adjusted = NodeOutput::with_meta(jump.payload.clone(), jump.hints.clone());
1789                DispatchOutcome::with_control(adjusted, NodeControl::Jump(jump))
1790            }
1791            NodeControl::Respond {
1792                text,
1793                card_cbor,
1794                needs_user,
1795            } => DispatchOutcome::with_control(
1796                output,
1797                NodeControl::Respond {
1798                    text,
1799                    card_cbor,
1800                    needs_user,
1801                },
1802            ),
1803            other => DispatchOutcome::with_control(output, other),
1804        });
1805    }
1806    Ok(DispatchOutcome::complete(output))
1807}
1808
1809fn parse_component_control(payload: &Value) -> Result<Option<NodeControl>> {
1810    let Value::Object(map) = payload else {
1811        return Ok(None);
1812    };
1813    let Some(control_value) = map.get("greentic_control") else {
1814        return Ok(None);
1815    };
1816    let control = control_value
1817        .as_object()
1818        .ok_or_else(|| anyhow!("jump_failed: greentic_control must be an object"))?;
1819    let action = control
1820        .get("action")
1821        .and_then(Value::as_str)
1822        .ok_or_else(|| anyhow!("jump_failed: greentic_control.action is required"))?;
1823    let version = control
1824        .get("v")
1825        .and_then(Value::as_u64)
1826        .ok_or_else(|| anyhow!("jump_failed: greentic_control.v is required"))?;
1827    if version != 1 {
1828        bail!("jump_failed: unsupported greentic_control.v={version}");
1829    }
1830
1831    match action {
1832        "jump" => {
1833            let flow = control
1834                .get("flow")
1835                .and_then(Value::as_str)
1836                .map(str::trim)
1837                .filter(|value| !value.is_empty())
1838                .ok_or_else(|| anyhow!("jump_failed: jump flow is required"))?
1839                .to_string();
1840            let node = control
1841                .get("node")
1842                .and_then(Value::as_str)
1843                .map(str::trim)
1844                .filter(|value| !value.is_empty())
1845                .map(str::to_string);
1846            let payload = control.get("payload").cloned().unwrap_or(Value::Null);
1847            let hints = control.get("hints").cloned().unwrap_or(Value::Null);
1848            let max_redirects = control
1849                .get("max_redirects")
1850                .and_then(Value::as_u64)
1851                .and_then(|value| u32::try_from(value).ok());
1852            let reason = control
1853                .get("reason")
1854                .and_then(Value::as_str)
1855                .map(str::to_string);
1856            Ok(Some(NodeControl::Jump(JumpControl {
1857                flow,
1858                node,
1859                payload,
1860                hints,
1861                max_redirects,
1862                reason,
1863            })))
1864        }
1865        "respond" => {
1866            let text = control
1867                .get("text")
1868                .and_then(Value::as_str)
1869                .map(str::to_string);
1870            let card_cbor = control
1871                .get("card_cbor")
1872                .and_then(Value::as_array)
1873                .map(|bytes| {
1874                    bytes
1875                        .iter()
1876                        .filter_map(Value::as_u64)
1877                        .filter_map(|value| u8::try_from(value).ok())
1878                        .collect::<Vec<_>>()
1879                });
1880            let needs_user = control.get("needs_user").and_then(Value::as_bool);
1881            Ok(Some(NodeControl::Respond {
1882                text,
1883                card_cbor,
1884                needs_user,
1885            }))
1886        }
1887        _ => Ok(None),
1888    }
1889}
1890
1891fn template_context(state: &ExecutionState, prev: Value) -> Value {
1892    let entry = if state.entry.is_null() {
1893        Value::Object(JsonMap::new())
1894    } else {
1895        state.entry.clone()
1896    };
1897    let mut ctx = JsonMap::new();
1898    ctx.insert("entry".into(), entry.clone());
1899    ctx.insert("in".into(), entry); // alias for entry - used in flow templates
1900    ctx.insert("prev".into(), prev);
1901    ctx.insert("node".into(), Value::Object(state.outputs_map()));
1902    ctx.insert("state".into(), state.context());
1903    Value::Object(ctx)
1904}
1905
1906impl From<Flow> for HostFlow {
1907    fn from(value: Flow) -> Self {
1908        let mut nodes = IndexMap::new();
1909        for (id, node) in value.nodes {
1910            nodes.insert(id.clone(), HostNode::from(node));
1911        }
1912        let start = value
1913            .entrypoints
1914            .get("default")
1915            .and_then(Value::as_str)
1916            .and_then(|id| NodeId::from_str(id).ok())
1917            .or_else(|| nodes.keys().next().cloned());
1918        // Extract flow-level slot_schema from metadata.extra (Phase D).
1919        // The producer side (greentic-flow compile_flow) stores it under
1920        // "greentic.slot_schema" when the FlowDoc has a `slot_schema` field.
1921        let slot_schema = value
1922            .metadata
1923            .extra
1924            .get(SLOT_SCHEMA_METADATA_KEY)
1925            .filter(|v| !v.is_null())
1926            .cloned();
1927        Self {
1928            id: value.id.as_str().to_string(),
1929            start,
1930            nodes,
1931            slot_schema,
1932        }
1933    }
1934}
1935
1936impl From<Node> for HostNode {
1937    fn from(node: Node) -> Self {
1938        let full_ref = node.component.id.as_str().to_string();
1939        let operation_in_mapping = extract_operation_from_mapping(&node.input.mapping);
1940        // A dotted component id is only a packed "<component>.<operation>" string
1941        // when the operation isn't carried structurally elsewhere. greentic-pack
1942        // resolves a component node to a bare component symbol (e.g.
1943        // `ai.greentic.component-templates`) and keeps the operation in the input
1944        // mapping, so splitting on the last dot here would corrupt the reference
1945        // (→ `ai.greentic`, "not found in pack"). Prefer the structured operation —
1946        // from `component.operation` or the input mapping — and only fall back to
1947        // the legacy single-ID split when neither is present.
1948        let is_builtin = full_ref.starts_with("component.exec")
1949            || full_ref.starts_with("flow.")
1950            || full_ref.starts_with("emit.")
1951            || full_ref.starts_with("session.")
1952            || full_ref.starts_with("provider.");
1953        let (component_ref, raw_operation) =
1954            if node.component.operation.is_some() || is_builtin || operation_in_mapping.is_some() {
1955                (full_ref, node.component.operation.clone())
1956            } else if let Some(dot) = full_ref.rfind('.') {
1957                let comp = full_ref[..dot].to_string();
1958                let op = full_ref[dot + 1..].to_string();
1959                (comp, Some(op))
1960            } else {
1961                (full_ref, None)
1962            };
1963        let operation_is_component_exec = raw_operation.as_deref() == Some("component.exec");
1964        let operation_is_emit = raw_operation
1965            .as_deref()
1966            .map(|op| op.starts_with("emit."))
1967            .unwrap_or(false);
1968        let is_component_exec = component_ref == "component.exec" || operation_is_component_exec;
1969
1970        let kind = if is_component_exec {
1971            let target = if component_ref == "component.exec" {
1972                if let Some(op) = raw_operation
1973                    .as_deref()
1974                    .filter(|op| op.starts_with("emit."))
1975                {
1976                    op.to_string()
1977                } else {
1978                    extract_target_component(&node.input.mapping)
1979                        .unwrap_or_else(|| "component.exec".to_string())
1980                }
1981            } else {
1982                extract_target_component(&node.input.mapping)
1983                    .unwrap_or_else(|| component_ref.clone())
1984            };
1985            if target.starts_with("emit.") {
1986                NodeKind::BuiltinEmit {
1987                    kind: emit_kind_from_ref(&target),
1988                }
1989            } else {
1990                NodeKind::Exec {
1991                    target_component: target,
1992                }
1993            }
1994        } else if operation_is_emit {
1995            NodeKind::BuiltinEmit {
1996                kind: emit_kind_from_ref(raw_operation.as_deref().unwrap_or("emit.log")),
1997            }
1998        } else {
1999            match component_ref.as_str() {
2000                "flow.call" => NodeKind::FlowCall,
2001                "provider.invoke" => NodeKind::ProviderInvoke,
2002                "session.wait" => NodeKind::Wait,
2003                "state.get" => NodeKind::BuiltinStateGet,
2004                "state.set" => NodeKind::BuiltinStateSet,
2005                comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
2006                    kind: emit_kind_from_ref(comp),
2007                },
2008                other => NodeKind::PackComponent {
2009                    component_ref: other.to_string(),
2010                },
2011            }
2012        };
2013        let component_label = match &kind {
2014            NodeKind::Exec { .. } => "component.exec".to_string(),
2015            NodeKind::PackComponent { component_ref } => component_ref.clone(),
2016            NodeKind::ProviderInvoke => "provider.invoke".to_string(),
2017            NodeKind::FlowCall => "flow.call".to_string(),
2018            NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
2019            NodeKind::BuiltinStateGet => "state.get".to_string(),
2020            NodeKind::BuiltinStateSet => "state.set".to_string(),
2021            NodeKind::Wait => "session.wait".to_string(),
2022        };
2023        let operation_name = if is_component_exec && operation_is_component_exec {
2024            None
2025        } else {
2026            raw_operation.clone()
2027        };
2028        let payload_expr = match kind {
2029            NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.input.mapping),
2030            _ => node.input.mapping.clone(),
2031        };
2032        Self {
2033            kind,
2034            component: component_label,
2035            component_id: if is_component_exec {
2036                "component.exec".to_string()
2037            } else {
2038                component_ref
2039            },
2040            operation_name,
2041            operation_in_mapping,
2042            payload_expr,
2043            routing: node.routing,
2044        }
2045    }
2046}
2047
2048fn extract_target_component(payload: &Value) -> Option<String> {
2049    match payload {
2050        Value::Object(map) => map
2051            .get("component")
2052            .or_else(|| map.get("component_ref"))
2053            .and_then(Value::as_str)
2054            .map(|s| s.to_string()),
2055        _ => None,
2056    }
2057}
2058
2059fn extract_operation_from_mapping(payload: &Value) -> Option<String> {
2060    match payload {
2061        Value::Object(map) => map
2062            .get("operation")
2063            .or_else(|| map.get("op"))
2064            .and_then(Value::as_str)
2065            .map(str::trim)
2066            .filter(|value| !value.is_empty())
2067            .map(|value| value.to_string()),
2068        _ => None,
2069    }
2070}
2071
2072fn extract_emit_payload(payload: &Value) -> Value {
2073    if let Value::Object(map) = payload {
2074        if let Some(input) = map.get("input") {
2075            return input.clone();
2076        }
2077        if let Some(inner) = map.get("payload") {
2078            return inner.clone();
2079        }
2080    }
2081    payload.clone()
2082}
2083
2084fn split_operation_payload(payload: Value) -> (Value, Value) {
2085    if let Value::Object(mut map) = payload.clone()
2086        && map.contains_key("input")
2087    {
2088        let input = map.remove("input").unwrap_or(Value::Null);
2089        let config = map.remove("config").unwrap_or(Value::Null);
2090        let legacy_only = map.keys().all(|key| {
2091            matches!(
2092                key.as_str(),
2093                "operation" | "op" | "component" | "component_ref"
2094            )
2095        });
2096        if legacy_only {
2097            return (input, config);
2098        }
2099    }
2100    (payload, Value::Null)
2101}
2102
2103fn resolve_component_operation(
2104    node_id: &str,
2105    component_label: &str,
2106    payload_operation: Option<String>,
2107    operation_override: Option<&str>,
2108    operation_in_mapping: Option<&str>,
2109) -> Result<String> {
2110    if let Some(op) = operation_override
2111        .map(str::trim)
2112        .filter(|value| !value.is_empty())
2113    {
2114        return Ok(op.to_string());
2115    }
2116
2117    if let Some(op) = payload_operation
2118        .as_deref()
2119        .map(str::trim)
2120        .filter(|value| !value.is_empty())
2121    {
2122        return Ok(op.to_string());
2123    }
2124
2125    let mut message = format!(
2126        "missing operation for node `{}` (component `{}`); expected node.component.operation to be set",
2127        node_id, component_label,
2128    );
2129    if let Some(found) = operation_in_mapping {
2130        message.push_str(&format!(
2131            ". Found operation in input.mapping (`{}`) but this is not used; pack compiler must preserve node.component.operation.",
2132            found
2133        ));
2134    }
2135    bail!(message);
2136}
2137
2138fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
2139    match component_ref {
2140        "emit.log" => EmitKind::Log,
2141        "emit.response" => EmitKind::Response,
2142        other => EmitKind::Other(other.to_string()),
2143    }
2144}
2145
2146fn emit_ref_from_kind(kind: &EmitKind) -> String {
2147    match kind {
2148        EmitKind::Log => "emit.log".to_string(),
2149        EmitKind::Response => "emit.response".to_string(),
2150        EmitKind::Other(other) => other.clone(),
2151    }
2152}
2153
2154/// Returns `true` when `input` looks like an Adaptive Card invocation
2155/// (contains `card_source` or `card_spec` at the top level).
2156fn is_card_invocation(input: &Value) -> bool {
2157    if let Value::Object(map) = input {
2158        return map.contains_key("card_source") || map.contains_key("card_spec");
2159    }
2160    false
2161}
2162
2163/// When the node config declares adaptive-card defaults (`default_card_asset`,
2164/// `default_card_inline`, or `default_source`) but the runtime invocation has
2165/// no `card_source`/`card_spec` yet, lift those defaults into the invocation.
2166/// This produces a schema-valid invocation envelope so the component does not
2167/// fall back to its generic "Welcome" placeholder.
2168///
2169/// Adaptive-card defaults can arrive in either of two places depending on how
2170/// the pack was compiled:
2171/// - top-level `call.config` (post `split_operation_payload`)
2172/// - nested `call.input.config` (when the node mapping kept the
2173///   `{component, config}` shape and `split_operation_payload` left it intact)
2174fn promote_card_config_to_invocation(input: &mut Value, config: &Value) {
2175    if is_card_invocation(input) {
2176        return;
2177    }
2178
2179    let cfg_map = card_defaults_source(input, config);
2180    let Some(cfg) = cfg_map else { return };
2181
2182    let default_asset = cfg
2183        .get("default_card_asset")
2184        .and_then(Value::as_str)
2185        .map(str::trim)
2186        .filter(|value| !value.is_empty())
2187        .map(str::to_string);
2188    let default_inline = cfg
2189        .get("default_card_inline")
2190        .filter(|value| value.is_object() || value.is_array())
2191        .cloned();
2192    let default_source = cfg
2193        .get("default_source")
2194        .and_then(Value::as_str)
2195        .map(str::trim)
2196        .filter(|value| !value.is_empty())
2197        .map(str::to_lowercase);
2198
2199    if default_asset.is_none() && default_inline.is_none() && default_source.is_none() {
2200        return;
2201    }
2202
2203    let card_source = default_source.unwrap_or_else(|| {
2204        if default_inline.is_some() {
2205            "inline".to_string()
2206        } else {
2207            "asset".to_string()
2208        }
2209    });
2210
2211    let mut card_spec = serde_json::Map::new();
2212    match card_source.as_str() {
2213        "asset" => {
2214            if let Some(path) = default_asset {
2215                card_spec.insert("asset_path".into(), Value::String(path));
2216            }
2217        }
2218        "inline" => {
2219            if let Some(inline) = default_inline {
2220                card_spec.insert("inline_json".into(), inline);
2221            }
2222        }
2223        _ => {}
2224    }
2225
2226    if !matches!(input, Value::Object(_)) {
2227        *input = Value::Object(serde_json::Map::new());
2228    }
2229    if let Value::Object(map) = input {
2230        map.insert("card_source".into(), Value::String(card_source));
2231        map.insert("card_spec".into(), Value::Object(card_spec));
2232    }
2233}
2234
2235/// Locate the adaptive-card defaults config object, preferring the top-level
2236/// `call.config` when present, then falling back to a nested `input.config`
2237/// (the shape produced when `split_operation_payload` leaves the mapping
2238/// intact).
2239fn card_defaults_source<'a>(
2240    input: &'a Value,
2241    config: &'a Value,
2242) -> Option<&'a serde_json::Map<String, Value>> {
2243    if let Value::Object(map) = config {
2244        return Some(map);
2245    }
2246    if let Value::Object(map) = input
2247        && let Some(Value::Object(nested)) = map.get("config")
2248    {
2249        return Some(nested);
2250    }
2251    None
2252}
2253
2254fn inject_card_locale(payload: &mut Value, entry: &Value) {
2255    if !is_card_invocation(payload) {
2256        return;
2257    }
2258    let Value::Object(map) = payload else { return };
2259    if map.contains_key("locale") {
2260        return;
2261    }
2262    let locale = entry
2263        .pointer("/input/metadata/locale")
2264        .or_else(|| entry.pointer("/metadata/locale"))
2265        .and_then(Value::as_str);
2266    if let Some(locale) = locale {
2267        map.insert("locale".into(), Value::String(locale.to_string()));
2268    }
2269}
2270
2271/// Inject flow-level `slot_schema` as `slot_definitions` into the
2272/// slot-extractor component's input value. Skips injection when the input
2273/// already contains an explicit `slot_definitions` key (back-compat with
2274/// M2.4 NDA demo inline definitions). When the input is `Null`, promotes it
2275/// to an empty object first.
2276fn inject_slot_definitions(input: &mut Value, slot_schema: &Value, flow_id: &str, node_id: &str) {
2277    if input.is_null() {
2278        *input = Value::Object(serde_json::Map::new());
2279    }
2280    let Some(map) = input.as_object_mut() else {
2281        tracing::warn!(
2282            flow_id,
2283            node_id,
2284            "slot-extractor input is not an object; cannot inject slot_definitions"
2285        );
2286        return;
2287    };
2288    if map.contains_key("slot_definitions") {
2289        return;
2290    }
2291    let slot_count = slot_schema.as_array().map_or(0, Vec::len);
2292    tracing::debug!(
2293        flow_id,
2294        slot_count,
2295        "injecting flow-level slot_schema as slot_definitions into slot-extractor input"
2296    );
2297    map.insert("slot_definitions".to_string(), slot_schema.clone());
2298}
2299
2300/// Pre-resolve `card_source: "asset"` entries by reading the referenced JSON
2301/// file from the pack's assets directory and converting to
2302/// `card_source: "inline"` with `inline_json` populated.
2303///
2304/// This handles both top-level card fields and the nested `call.payload`
2305/// structure emitted by cards2pack.
2306fn resolve_card_assets(input: &mut Value, pack: &crate::pack::PackRuntime) {
2307    resolve_card_spec_asset(input, pack);
2308
2309    // Also resolve inside `call.payload` (cards2pack duplicates the card
2310    // invocation there).
2311    if let Value::Object(map) = input
2312        && let Some(Value::Object(call)) = map.get_mut("call")
2313        && let Some(payload) = call.get_mut("payload")
2314    {
2315        resolve_card_spec_asset(payload, pack);
2316    }
2317}
2318
2319/// Resolve a single card_spec asset_path → inline_json.
2320fn resolve_card_spec_asset(value: &mut Value, pack: &crate::pack::PackRuntime) {
2321    let Value::Object(map) = value else { return };
2322
2323    let is_asset = map
2324        .get("card_source")
2325        .and_then(Value::as_str)
2326        .map(|s| s.eq_ignore_ascii_case("asset"))
2327        .unwrap_or(false);
2328    if !is_asset {
2329        return;
2330    }
2331
2332    let asset_path = map
2333        .get("card_spec")
2334        .and_then(|spec| spec.get("asset_path"))
2335        .and_then(Value::as_str)
2336        .map(str::to_string);
2337
2338    let Some(asset_path) = asset_path else { return };
2339
2340    match pack.read_asset(&asset_path) {
2341        Ok(bytes) => {
2342            let card_json: Value = match serde_json::from_slice(&bytes) {
2343                Ok(v) => v,
2344                Err(err) => {
2345                    tracing::warn!(
2346                        asset_path,
2347                        %err,
2348                        "failed to parse card asset as JSON; leaving as asset reference"
2349                    );
2350                    return;
2351                }
2352            };
2353            tracing::debug!(asset_path, "pre-resolved card asset to inline_json");
2354            map.insert("card_source".into(), Value::String("inline".into()));
2355            if let Some(Value::Object(spec)) = map.get_mut("card_spec") {
2356                spec.insert("inline_json".into(), card_json);
2357                spec.remove("asset_path");
2358            }
2359        }
2360        Err(err) => {
2361            tracing::warn!(
2362                asset_path,
2363                %err,
2364                "card asset not found in pack; leaving as asset reference"
2365            );
2366        }
2367    }
2368
2369    // Pre-resolve i18n bundle: the WASM component cannot read pack assets
2370    // directly (no host resolver registered), so inline the i18n JSON into
2371    // the invocation under `card_spec.i18n_inline`. Defense-in-depth: when
2372    // the card omits an explicit `i18n_bundle_path` we still try the
2373    // conventional `assets/i18n/` location so cards that rely on
2374    // auto-generated i18n keys (e.g. cards2pack output) keep working.
2375    let configured_bundle_path = map
2376        .get("card_spec")
2377        .and_then(|spec| spec.get("i18n_bundle_path"))
2378        .and_then(Value::as_str)
2379        .map(|s| s.trim().trim_end_matches('/').to_string())
2380        .filter(|s| !s.is_empty());
2381
2382    let bundle_path = configured_bundle_path
2383        .clone()
2384        .unwrap_or_else(|| "assets/i18n".to_string());
2385
2386    let i18n_entries = load_i18n_bundle_entries(&bundle_path, |path| pack.read_asset(path));
2387
2388    if !i18n_entries.is_empty() {
2389        let locale_keys: Vec<_> = i18n_entries.keys().cloned().collect();
2390        if let Some(Value::Object(spec)) = map.get_mut("card_spec") {
2391            spec.insert("i18n_inline".into(), Value::Object(i18n_entries));
2392            if configured_bundle_path.is_some() {
2393                tracing::info!(%bundle_path, ?locale_keys, "pre-resolved i18n bundle into card_spec.i18n_inline");
2394            } else {
2395                tracing::info!(%bundle_path, ?locale_keys, "auto-discovered i18n bundle and inlined into card_spec.i18n_inline");
2396            }
2397        }
2398    }
2399}
2400
2401fn load_i18n_bundle_entries<F>(bundle_path: &str, mut read_asset: F) -> JsonMap<String, Value>
2402where
2403    F: FnMut(&str) -> Result<Vec<u8>>,
2404{
2405    let mut i18n_entries = JsonMap::new();
2406
2407    if bundle_path.ends_with(".json") {
2408        if let Ok(bytes) = read_asset(bundle_path)
2409            && let Ok(Value::Object(entries)) = serde_json::from_slice::<Value>(&bytes)
2410        {
2411            i18n_entries.insert("en".to_string(), Value::Object(entries));
2412        }
2413        return i18n_entries;
2414    }
2415
2416    let manifest_path = format!("{bundle_path}/_manifest.json");
2417    let locale_codes: Vec<String> = read_asset(&manifest_path)
2418        .ok()
2419        .and_then(|bytes| serde_json::from_slice::<Value>(&bytes).ok())
2420        .and_then(|value| {
2421            let locales = value
2422                .get("locales")
2423                .and_then(Value::as_array)
2424                .cloned()
2425                .or_else(|| value.as_array().cloned());
2426            locales.map(|items| {
2427                items
2428                    .iter()
2429                    .filter_map(Value::as_str)
2430                    .map(String::from)
2431                    .collect()
2432            })
2433        })
2434        .unwrap_or_default();
2435
2436    tracing::info!(%bundle_path, ?locale_codes, "i18n manifest discovered locales");
2437
2438    for locale in &locale_codes {
2439        let candidate = format!("{bundle_path}/{locale}.json");
2440        if let Ok(bytes) = read_asset(&candidate)
2441            && let Ok(Value::Object(entries)) = serde_json::from_slice::<Value>(&bytes)
2442        {
2443            i18n_entries.insert(locale.clone(), Value::Object(entries));
2444        }
2445    }
2446    if !i18n_entries.contains_key("en") {
2447        let en_path = format!("{bundle_path}/en.json");
2448        if let Ok(bytes) = read_asset(&en_path)
2449            && let Ok(Value::Object(entries)) = serde_json::from_slice::<Value>(&bytes)
2450        {
2451            i18n_entries.insert("en".to_string(), Value::Object(entries));
2452        }
2453    }
2454
2455    i18n_entries
2456}
2457
2458/// Outcome of `evaluate_custom_routing` for a node's `Routing::Custom` array.
2459///
2460/// `Next` advances the flow to the named target. `End` terminates the run.
2461/// `Wait` pauses the run at the current node so the next inbound activity
2462/// resumes here and re-evaluates the routing with the new context — this is
2463/// what allows messaging flows (welcome → ... → confirm) to behave like a
2464/// live conversation instead of restarting at the entry point on every
2465/// click.
2466#[derive(Debug)]
2467pub(crate) enum CustomRoutingDecision {
2468    Next(NodeId),
2469    End,
2470    Wait,
2471}
2472
2473/// Evaluate a node's `Routing::Custom` array against the current execution
2474/// context.
2475///
2476/// Parses `Routing::Custom(Value)` as an array of `{condition, to}` objects.
2477/// Conditions are simple equality expressions like `response.action == "about"`.
2478/// Falls back to the first route without a condition (default route).
2479///
2480/// The evaluation context includes:
2481/// - All fields from the node output payload (top-level)
2482/// - `entry` / `in` — the original flow entry (incoming message)
2483/// - `response` — synthesized from entry metadata for convenient condition checks
2484///   (e.g. `response.action` maps to `metadata.action` from the incoming envelope)
2485fn evaluate_custom_routing(
2486    raw: &Value,
2487    output: &NodeOutput,
2488    state: &ExecutionState,
2489    flow_ir: &HostFlow,
2490    node_id: &NodeId,
2491) -> CustomRoutingDecision {
2492    let routes = match raw.as_array() {
2493        Some(arr) => arr,
2494        None => {
2495            tracing::warn!(
2496                flow_id = %flow_ir.id,
2497                node_id = %node_id,
2498                "custom routing is not an array; terminating"
2499            );
2500            return CustomRoutingDecision::End;
2501        }
2502    };
2503
2504    // Build a rich context for condition evaluation:
2505    // Start with output payload, then overlay entry and synthesised "response".
2506    let ctx = build_routing_context(output, state);
2507
2508    let mut has_condition = false;
2509    for route in routes {
2510        let condition = route.get("condition").and_then(|v| v.as_str());
2511        let to = route.get("to").and_then(|v| v.as_str());
2512
2513        if let Some(cond) = condition {
2514            has_condition = true;
2515            if evaluate_simple_condition(cond, &ctx)
2516                && let Some(target) = to
2517                && let Ok(nid) = NodeId::new(target)
2518            {
2519                tracing::debug!(
2520                    flow_id = %flow_ir.id,
2521                    node_id = %node_id,
2522                    condition = cond,
2523                    target = target,
2524                    "conditional route matched"
2525                );
2526                return CustomRoutingDecision::Next(nid);
2527            }
2528        } else if let Some(target) = to
2529            && let Ok(nid) = NodeId::new(target)
2530        {
2531            tracing::debug!(
2532                flow_id = %flow_ir.id,
2533                node_id = %node_id,
2534                target = target,
2535                "default route taken"
2536            );
2537            return CustomRoutingDecision::Next(nid);
2538        }
2539    }
2540
2541    // Fall-through. When the routing array contained at least one
2542    // conditional entry, treat the unmatched fall-through as a pause: the
2543    // user's next submission should be re-evaluated against this same
2544    // node's routing rather than restarting the flow from the entry point.
2545    // Routing arrays with no conditions at all (pure unconditional `out`
2546    // terminators) remain true ends.
2547    if has_condition {
2548        tracing::debug!(
2549            flow_id = %flow_ir.id,
2550            node_id = %node_id,
2551            "no conditional route matched; pausing run at current node for resume"
2552        );
2553        CustomRoutingDecision::Wait
2554    } else {
2555        tracing::warn!(
2556            flow_id = %flow_ir.id,
2557            node_id = %node_id,
2558            "no route matched and no conditions present; terminating"
2559        );
2560        CustomRoutingDecision::End
2561    }
2562}
2563
2564/// Evaluate a simple condition expression like `response.action == "about"`.
2565///
2566/// Supports dotted path lookups against a JSON value context.
2567/// Format: `<path> == "<value>"` or `<path> != "<value>"`
2568fn evaluate_simple_condition(condition: &str, ctx: &Value) -> bool {
2569    // Parse: `path == "value"` or `path != "value"`
2570    let (path, expected, negate) = if let Some(idx) = condition.find("==") {
2571        let path = condition[..idx].trim();
2572        let val = condition[idx + 2..].trim().trim_matches('"');
2573        (path, val, false)
2574    } else if let Some(idx) = condition.find("!=") {
2575        let path = condition[..idx].trim();
2576        let val = condition[idx + 2..].trim().trim_matches('"');
2577        (path, val, true)
2578    } else {
2579        return false;
2580    };
2581
2582    // Resolve dotted path against context (case-insensitive comparison)
2583    let actual = resolve_dotted_path(ctx, path);
2584    let matches = actual
2585        .as_deref()
2586        .is_some_and(|a| a.eq_ignore_ascii_case(expected));
2587    if negate { !matches } else { matches }
2588}
2589
2590/// Resolve a dotted path like `response.action` against a JSON value.
2591fn resolve_dotted_path(value: &Value, path: &str) -> Option<String> {
2592    let parts: Vec<&str> = path.split('.').collect();
2593    let mut current = value;
2594    for part in &parts {
2595        current = current.get(part)?;
2596    }
2597    match current {
2598        Value::String(s) => Some(s.clone()),
2599        Value::Bool(b) => Some(b.to_string()),
2600        Value::Number(n) => Some(n.to_string()),
2601        _ => Some(current.to_string()),
2602    }
2603}
2604
2605/// Build a context object for routing condition evaluation.
2606///
2607/// The context merges the node output with the flow entry so that conditions
2608/// can reference both component results and incoming message data.
2609///
2610/// Layout:
2611/// ```text
2612/// {
2613///   ...output.payload...,     // top-level fields from component output
2614///   "entry": <flow entry>,
2615///   "in":    <flow entry>,    // alias
2616///   "response": {             // synthesised from envelope metadata
2617///     <key>: <value>,         // e.g. "action": "about"
2618///     ...
2619///   }
2620/// }
2621/// ```
2622fn build_routing_context(output: &NodeOutput, state: &ExecutionState) -> Value {
2623    let mut ctx = match &output.payload {
2624        Value::Object(map) => map.clone(),
2625        _ => JsonMap::new(),
2626    };
2627
2628    let entry = &state.entry;
2629    ctx.insert("entry".into(), entry.clone());
2630    ctx.insert("in".into(), entry.clone());
2631
2632    // Synthesise "response" from the envelope metadata.
2633    // greentic-start demo path: entry.input.metadata.*
2634    // greentic-runner direct path: entry.metadata.*
2635    let metadata = entry
2636        .pointer("/input/metadata")
2637        .or_else(|| entry.pointer("/metadata"));
2638
2639    let mut response = JsonMap::new();
2640    if let Some(Value::Object(meta)) = metadata {
2641        for (k, v) in meta {
2642            // Flatten string values; stringify others
2643            match v {
2644                Value::String(s) => {
2645                    response.insert(k.clone(), Value::String(s.clone()));
2646                }
2647                other => {
2648                    response.insert(k.clone(), other.clone());
2649                }
2650            }
2651        }
2652    }
2653    // Also pull text from the envelope for convenience
2654    if let Some(text) = entry
2655        .pointer("/input/text")
2656        .or_else(|| entry.pointer("/text"))
2657        .filter(|t| !t.is_null())
2658    {
2659        response.insert("text".into(), text.clone());
2660    }
2661    ctx.insert("response".into(), Value::Object(response));
2662
2663    Value::Object(ctx)
2664}
2665
2666#[cfg(test)]
2667mod tests {
2668    use super::*;
2669    use crate::validate::{ValidationConfig, ValidationMode};
2670    use greentic_types::{
2671        Flow, FlowComponentRef, FlowId, FlowKind, InputMapping, Node, NodeId, OutputMapping,
2672        Routing, TelemetryHints,
2673    };
2674    use serde_json::json;
2675    use std::collections::{BTreeMap, HashMap as StdHashMap};
2676    use std::str::FromStr;
2677    use std::sync::Mutex;
2678    use tokio::runtime::Runtime;
2679
2680    fn minimal_engine() -> FlowEngine {
2681        FlowEngine {
2682            packs: Vec::new(),
2683            flows: Vec::new(),
2684            flow_sources: HashMap::new(),
2685            flow_cache: RwLock::new(HashMap::new()),
2686            default_env: "local".to_string(),
2687            validation: ValidationConfig {
2688                mode: ValidationMode::Off,
2689            },
2690            cross_pack_resolver: None,
2691            rollout_ids: RolloutIds::default(),
2692        }
2693    }
2694
2695    #[test]
2696    fn templating_renders_with_partials_and_data() {
2697        let mut state = ExecutionState::new(json!({ "city": "London" }));
2698        state.nodes.insert(
2699            "forecast".to_string(),
2700            NodeOutput::new(json!({ "temp": "20C" })),
2701        );
2702
2703        // templating context includes node outputs for runner-side payload rendering.
2704        let ctx = state.context();
2705        assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
2706    }
2707
2708    #[test]
2709    fn finalize_wraps_emitted_payloads() {
2710        let mut state = ExecutionState::new(json!({}));
2711        state.push_egress(json!({ "text": "first" }));
2712        state.push_egress(json!({ "text": "second" }));
2713        let result = state.finalize_with(Some(json!({ "text": "final" })));
2714        assert_eq!(
2715            result,
2716            json!([
2717                { "text": "first" },
2718                { "text": "second" },
2719                { "text": "final" }
2720            ])
2721        );
2722    }
2723
2724    #[test]
2725    fn finalize_flattens_final_array() {
2726        let mut state = ExecutionState::new(json!({}));
2727        state.push_egress(json!({ "text": "only" }));
2728        let result = state.finalize_with(Some(json!([
2729            { "text": "extra-1" },
2730            { "text": "extra-2" }
2731        ])));
2732        assert_eq!(
2733            result,
2734            json!([
2735                { "text": "only" },
2736                { "text": "extra-1" },
2737                { "text": "extra-2" }
2738            ])
2739        );
2740    }
2741
2742    #[test]
2743    fn inject_card_locale_uses_entry_metadata_without_overwriting_payload() {
2744        let mut payload = json!({
2745            "card_source": "inline",
2746            "card_spec": { "title": "Hello" }
2747        });
2748        inject_card_locale(
2749            &mut payload,
2750            &json!({"input": {"metadata": {"locale": "nl-NL"}}}),
2751        );
2752        assert_eq!(payload["locale"], json!("nl-NL"));
2753
2754        let mut existing = json!({
2755            "card_source": "inline",
2756            "card_spec": { "title": "Hello" },
2757            "locale": "en-GB"
2758        });
2759        inject_card_locale(&mut existing, &json!({"metadata": {"locale": "nl-NL"}}));
2760        assert_eq!(existing["locale"], json!("en-GB"));
2761    }
2762
2763    #[test]
2764    fn load_i18n_bundle_entries_reads_manifest_and_falls_back_to_en() {
2765        let assets = StdHashMap::from([
2766            (
2767                "cards/i18n/_manifest.json".to_string(),
2768                br#"{"locales":["de"]}"#.to_vec(),
2769            ),
2770            (
2771                "cards/i18n/de.json".to_string(),
2772                br#"{"title":"Hallo"}"#.to_vec(),
2773            ),
2774            (
2775                "cards/i18n/en.json".to_string(),
2776                br#"{"title":"Hello"}"#.to_vec(),
2777            ),
2778        ]);
2779
2780        let entries = load_i18n_bundle_entries("cards/i18n", |path| {
2781            assets
2782                .get(path)
2783                .cloned()
2784                .with_context(|| format!("missing asset {path}"))
2785        });
2786
2787        assert_eq!(entries["de"]["title"], json!("Hallo"));
2788        assert_eq!(entries["en"]["title"], json!("Hello"));
2789    }
2790
2791    #[test]
2792    fn load_i18n_bundle_entries_reads_single_file_bundle() {
2793        let entries = load_i18n_bundle_entries("cards/i18n.json", |path| {
2794            if path == "cards/i18n.json" {
2795                Ok(br#"{"title":"Hello"}"#.to_vec())
2796            } else {
2797                bail!("unexpected asset {path}");
2798            }
2799        });
2800
2801        assert_eq!(entries["en"]["title"], json!("Hello"));
2802    }
2803
2804    struct TestCrossPackResolver;
2805
2806    impl CrossPackResolver for TestCrossPackResolver {
2807        fn invoke(
2808            &self,
2809            provider_id: &str,
2810            provider_type: Option<&str>,
2811            op: &str,
2812            input: &[u8],
2813            tenant: &str,
2814            team: Option<&str>,
2815        ) -> Result<Value> {
2816            Ok(json!({
2817                "provider_id": provider_id,
2818                "provider_type": provider_type,
2819                "op": op,
2820                "tenant": tenant,
2821                "team": team,
2822                "input": serde_json::from_slice::<Value>(input)?,
2823            }))
2824        }
2825    }
2826
2827    #[test]
2828    fn cross_pack_resolver_returns_node_output_when_present() {
2829        let mut engine = minimal_engine();
2830        engine.set_cross_pack_resolver(Arc::new(TestCrossPackResolver));
2831
2832        let output = engine
2833            .try_invoke_cross_pack_resolver(
2834                Some("mail"),
2835                Some("messaging"),
2836                "send",
2837                br#"{"subject":"hello"}"#,
2838                "demo",
2839            )
2840            .expect("resolver invocation")
2841            .expect("resolver output");
2842
2843        assert_eq!(
2844            output.payload,
2845            json!({
2846                "provider_id": "mail",
2847                "provider_type": "messaging",
2848                "op": "send",
2849                "tenant": "demo",
2850                "team": null,
2851                "input": { "subject": "hello" },
2852            })
2853        );
2854    }
2855
2856    #[test]
2857    fn parse_component_control_ignores_plain_payload() {
2858        let payload = json!({
2859            "flow": "not-a-control-field",
2860            "node": "n1"
2861        });
2862        let control = parse_component_control(&payload).expect("parse control");
2863        assert!(control.is_none());
2864    }
2865
2866    #[test]
2867    fn parse_component_control_parses_jump_marker() {
2868        let payload = json!({
2869            "greentic_control": {
2870                "action": "jump",
2871                "v": 1,
2872                "flow": "flow.b",
2873                "node": "node-2",
2874                "payload": { "message": "hi" },
2875                "hints": { "k": "v" },
2876                "max_redirects": 2,
2877                "reason": "handoff"
2878            }
2879        });
2880        let control = parse_component_control(&payload)
2881            .expect("parse control")
2882            .expect("missing control");
2883        match control {
2884            NodeControl::Jump(jump) => {
2885                assert_eq!(jump.flow, "flow.b");
2886                assert_eq!(jump.node.as_deref(), Some("node-2"));
2887                assert_eq!(jump.payload, json!({ "message": "hi" }));
2888                assert_eq!(jump.hints, json!({ "k": "v" }));
2889                assert_eq!(jump.max_redirects, Some(2));
2890                assert_eq!(jump.reason.as_deref(), Some("handoff"));
2891            }
2892            other => panic!("expected jump control, got {other:?}"),
2893        }
2894    }
2895
2896    #[test]
2897    fn parse_component_control_rejects_invalid_marker() {
2898        let payload = json!({
2899            "greentic_control": "bad-shape"
2900        });
2901        let err = parse_component_control(&payload).expect_err("expected invalid marker error");
2902        assert!(err.to_string().contains("greentic_control"));
2903    }
2904
2905    #[test]
2906    fn missing_operation_reports_node_and_component() {
2907        let engine = minimal_engine();
2908        let rt = Runtime::new().unwrap();
2909        let retry_config = RetryConfig {
2910            max_attempts: 1,
2911            base_delay_ms: 1,
2912        };
2913        let ctx = FlowContext {
2914            tenant: "tenant",
2915            pack_id: "test-pack",
2916            flow_id: "flow",
2917            node_id: Some("missing-op"),
2918            tool: None,
2919            action: None,
2920            session_id: None,
2921            provider_id: None,
2922            retry_config,
2923            attempt: 1,
2924            observer: None,
2925            mocks: None,
2926        };
2927        let node = HostNode {
2928            kind: NodeKind::Exec {
2929                target_component: "qa.process".into(),
2930            },
2931            component: "component.exec".into(),
2932            component_id: "component.exec".into(),
2933            operation_name: None,
2934            operation_in_mapping: None,
2935            payload_expr: Value::Null,
2936            routing: Routing::End,
2937        };
2938        let _state = ExecutionState::new(Value::Null);
2939        let payload = json!({ "component": "qa.process" });
2940        let event = NodeEvent {
2941            context: &ctx,
2942            node_id: "missing-op",
2943            node: &node,
2944            payload: &payload,
2945        };
2946        let err = rt
2947            .block_on(engine.execute_component_exec(
2948                &ctx,
2949                "missing-op",
2950                &node,
2951                payload.clone(),
2952                &event,
2953                ComponentOverrides {
2954                    component: None,
2955                    operation: None,
2956                },
2957            ))
2958            .unwrap_err();
2959        let message = err.to_string();
2960        assert!(
2961            message.contains("missing operation for node `missing-op`"),
2962            "unexpected message: {message}"
2963        );
2964        assert!(
2965            message.contains("(component `component.exec`)"),
2966            "unexpected message: {message}"
2967        );
2968    }
2969
2970    #[test]
2971    fn missing_operation_mentions_mapping_hint() {
2972        let engine = minimal_engine();
2973        let rt = Runtime::new().unwrap();
2974        let retry_config = RetryConfig {
2975            max_attempts: 1,
2976            base_delay_ms: 1,
2977        };
2978        let ctx = FlowContext {
2979            tenant: "tenant",
2980            pack_id: "test-pack",
2981            flow_id: "flow",
2982            node_id: Some("missing-op-hint"),
2983            tool: None,
2984            action: None,
2985            session_id: None,
2986            provider_id: None,
2987            retry_config,
2988            attempt: 1,
2989            observer: None,
2990            mocks: None,
2991        };
2992        let node = HostNode {
2993            kind: NodeKind::Exec {
2994                target_component: "qa.process".into(),
2995            },
2996            component: "component.exec".into(),
2997            component_id: "component.exec".into(),
2998            operation_name: None,
2999            operation_in_mapping: Some("render".into()),
3000            payload_expr: Value::Null,
3001            routing: Routing::End,
3002        };
3003        let _state = ExecutionState::new(Value::Null);
3004        let payload = json!({ "component": "qa.process" });
3005        let event = NodeEvent {
3006            context: &ctx,
3007            node_id: "missing-op-hint",
3008            node: &node,
3009            payload: &payload,
3010        };
3011        let err = rt
3012            .block_on(engine.execute_component_exec(
3013                &ctx,
3014                "missing-op-hint",
3015                &node,
3016                payload.clone(),
3017                &event,
3018                ComponentOverrides {
3019                    component: None,
3020                    operation: None,
3021                },
3022            ))
3023            .unwrap_err();
3024        let message = err.to_string();
3025        assert!(
3026            message.contains("missing operation for node `missing-op-hint`"),
3027            "unexpected message: {message}"
3028        );
3029        assert!(
3030            message.contains("Found operation in input.mapping (`render`)"),
3031            "unexpected message: {message}"
3032        );
3033    }
3034
3035    struct CountingObserver {
3036        starts: Mutex<Vec<String>>,
3037        ends: Mutex<Vec<Value>>,
3038    }
3039
3040    impl CountingObserver {
3041        fn new() -> Self {
3042            Self {
3043                starts: Mutex::new(Vec::new()),
3044                ends: Mutex::new(Vec::new()),
3045            }
3046        }
3047    }
3048
3049    impl ExecutionObserver for CountingObserver {
3050        fn on_node_start(&self, event: &NodeEvent<'_>) {
3051            self.starts.lock().unwrap().push(event.node_id.to_string());
3052        }
3053
3054        fn on_node_end(&self, _event: &NodeEvent<'_>, output: &Value) {
3055            self.ends.lock().unwrap().push(output.clone());
3056        }
3057
3058        fn on_node_error(&self, _event: &NodeEvent<'_>, _error: &dyn StdError) {}
3059    }
3060
3061    #[test]
3062    fn emits_end_event_for_successful_node() {
3063        let node_id = NodeId::from_str("emit").unwrap();
3064        let node = Node {
3065            id: node_id.clone(),
3066            component: FlowComponentRef {
3067                id: "emit.log".parse().unwrap(),
3068                pack_alias: None,
3069                operation: None,
3070            },
3071            input: InputMapping {
3072                mapping: json!({ "message": "logged" }),
3073            },
3074            output: OutputMapping {
3075                mapping: Value::Null,
3076            },
3077            err_map: None,
3078            routing: Routing::End,
3079            telemetry: TelemetryHints::default(),
3080        };
3081        let mut nodes = indexmap::IndexMap::default();
3082        nodes.insert(node_id.clone(), node);
3083        let flow = Flow {
3084            schema_version: "1.0".into(),
3085            id: FlowId::from_str("emit.flow").unwrap(),
3086            kind: FlowKind::Messaging,
3087            entrypoints: BTreeMap::from([(
3088                "default".to_string(),
3089                Value::String(node_id.to_string()),
3090            )]),
3091            nodes,
3092            metadata: Default::default(),
3093        };
3094        let host_flow = HostFlow::from(flow);
3095
3096        let engine = FlowEngine {
3097            packs: Vec::new(),
3098            flows: Vec::new(),
3099            flow_sources: HashMap::new(),
3100            flow_cache: RwLock::new(HashMap::from([(
3101                FlowKey {
3102                    pack_id: "test-pack".to_string(),
3103                    flow_id: "emit.flow".to_string(),
3104                },
3105                host_flow,
3106            )])),
3107            default_env: "local".to_string(),
3108            validation: ValidationConfig {
3109                mode: ValidationMode::Off,
3110            },
3111            cross_pack_resolver: None,
3112            rollout_ids: RolloutIds::default(),
3113        };
3114        let observer = CountingObserver::new();
3115        let ctx = FlowContext {
3116            tenant: "demo",
3117            pack_id: "test-pack",
3118            flow_id: "emit.flow",
3119            node_id: None,
3120            tool: None,
3121            action: None,
3122            session_id: None,
3123            provider_id: None,
3124            retry_config: RetryConfig {
3125                max_attempts: 1,
3126                base_delay_ms: 1,
3127            },
3128            attempt: 1,
3129            observer: Some(&observer),
3130            mocks: None,
3131        };
3132
3133        let rt = Runtime::new().unwrap();
3134        let result = rt.block_on(engine.execute(ctx, Value::Null)).unwrap();
3135        assert!(matches!(result.status, FlowStatus::Completed));
3136
3137        let starts = observer.starts.lock().unwrap();
3138        let ends = observer.ends.lock().unwrap();
3139        assert_eq!(starts.len(), 1);
3140        assert_eq!(ends.len(), 1);
3141        assert_eq!(ends[0], json!({ "message": "logged" }));
3142    }
3143
3144    #[test]
3145    fn dotted_component_id_with_mapping_operation_is_not_split() {
3146        // greentic-pack resolves a component node to a bare component symbol and
3147        // keeps the operation in the input mapping. The runtime must NOT split the
3148        // dotted symbol on the last dot (which would yield `ai.greentic`, "not
3149        // found in pack"); the structured mapping operation makes the id a
3150        // complete reference.
3151        let node = Node {
3152            id: NodeId::from_str("render").unwrap(),
3153            component: FlowComponentRef {
3154                id: "ai.greentic.component-templates".parse().unwrap(),
3155                pack_alias: None,
3156                operation: None,
3157            },
3158            input: InputMapping {
3159                mapping: json!({ "operation": "handle_message", "input": "hi" }),
3160            },
3161            output: OutputMapping {
3162                mapping: Value::Null,
3163            },
3164            err_map: None,
3165            routing: Routing::End,
3166            telemetry: TelemetryHints::default(),
3167        };
3168        let host = HostNode::from(node);
3169        assert!(
3170            matches!(&host.kind, NodeKind::PackComponent { component_ref } if component_ref == "ai.greentic.component-templates"),
3171            "dotted component id must stay intact, got kind {:?}",
3172            host.kind
3173        );
3174        assert_eq!(host.component, "ai.greentic.component-templates");
3175        assert_eq!(host.operation_in_mapping(), Some("handle_message"));
3176    }
3177
3178    #[test]
3179    fn packed_component_operation_id_still_splits_without_mapping_operation() {
3180        // Legacy encoding: the operation is packed into the id as
3181        // `<component>.<operation>` and absent from the mapping. The last-dot
3182        // split must still recover it.
3183        let node = Node {
3184            id: NodeId::from_str("render").unwrap(),
3185            component: FlowComponentRef {
3186                id: "templating.handlebars".parse().unwrap(),
3187                pack_alias: None,
3188                operation: None,
3189            },
3190            input: InputMapping {
3191                mapping: json!({ "text": "hello" }),
3192            },
3193            output: OutputMapping {
3194                mapping: Value::Null,
3195            },
3196            err_map: None,
3197            routing: Routing::End,
3198            telemetry: TelemetryHints::default(),
3199        };
3200        let host = HostNode::from(node);
3201        assert!(
3202            matches!(&host.kind, NodeKind::PackComponent { component_ref } if component_ref == "templating"),
3203            "packed <component>.<operation> id must split, got kind {:?}",
3204            host.kind
3205        );
3206        assert_eq!(host.operation_name(), Some("handlebars"));
3207    }
3208
3209    fn host_flow_for_test(
3210        flow_id: &str,
3211        node_ids: &[&str],
3212        default_start: Option<&str>,
3213    ) -> HostFlow {
3214        let mut nodes = indexmap::IndexMap::default();
3215        for node_id in node_ids {
3216            let id = NodeId::from_str(node_id).unwrap();
3217            let node = Node {
3218                id: id.clone(),
3219                component: FlowComponentRef {
3220                    id: "emit.log".parse().unwrap(),
3221                    pack_alias: None,
3222                    operation: None,
3223                },
3224                input: InputMapping {
3225                    mapping: json!({ "message": node_id }),
3226                },
3227                output: OutputMapping {
3228                    mapping: Value::Null,
3229                },
3230                err_map: None,
3231                routing: Routing::End,
3232                telemetry: TelemetryHints::default(),
3233            };
3234            nodes.insert(id, node);
3235        }
3236        let mut entrypoints = BTreeMap::new();
3237        if let Some(start) = default_start {
3238            entrypoints.insert("default".to_string(), Value::String(start.to_string()));
3239        }
3240        HostFlow::from(Flow {
3241            schema_version: "1.0".into(),
3242            id: FlowId::from_str(flow_id).unwrap(),
3243            kind: FlowKind::Messaging,
3244            entrypoints,
3245            nodes,
3246            metadata: Default::default(),
3247        })
3248    }
3249
3250    fn jump_test_engine() -> FlowEngine {
3251        let target_flow = host_flow_for_test("flow.target", &["node-a", "node-b"], None);
3252        FlowEngine {
3253            packs: Vec::new(),
3254            flows: Vec::new(),
3255            flow_sources: HashMap::new(),
3256            flow_cache: RwLock::new(HashMap::from([(
3257                FlowKey {
3258                    pack_id: "test-pack".to_string(),
3259                    flow_id: "flow.target".to_string(),
3260                },
3261                target_flow,
3262            )])),
3263            default_env: "local".to_string(),
3264            validation: ValidationConfig {
3265                mode: ValidationMode::Off,
3266            },
3267            cross_pack_resolver: None,
3268            rollout_ids: RolloutIds::default(),
3269        }
3270    }
3271
3272    fn jump_ctx<'a>(flow_id: &'a str) -> FlowContext<'a> {
3273        FlowContext {
3274            tenant: "demo",
3275            pack_id: "test-pack",
3276            flow_id,
3277            node_id: None,
3278            tool: None,
3279            action: None,
3280            session_id: None,
3281            provider_id: None,
3282            retry_config: RetryConfig {
3283                max_attempts: 1,
3284                base_delay_ms: 1,
3285            },
3286            attempt: 1,
3287            observer: None,
3288            mocks: None,
3289        }
3290    }
3291
3292    #[test]
3293    fn with_rollout_ids_binds_revision_identity() {
3294        let engine = minimal_engine().with_rollout_ids(RolloutIds {
3295            customer_id: Some("cust-acme".into()),
3296            deployment_id: Some("01JTKS".into()),
3297            bundle_id: Some("customer.support".into()),
3298            revision_id: Some("01JTKR".into()),
3299        });
3300        assert_eq!(engine.rollout_ids.revision_id.as_deref(), Some("01JTKR"));
3301        assert_eq!(engine.rollout_ids.deployment_id.as_deref(), Some("01JTKS"));
3302        // A freshly-built engine carries no rollout identity (legacy runtime).
3303        assert!(minimal_engine().rollout_ids.is_empty());
3304    }
3305
3306    #[test]
3307    fn apply_jump_unknown_flow_errors() {
3308        let engine = minimal_engine();
3309        let mut state = ExecutionState::new(Value::Null);
3310        let rt = Runtime::new().unwrap();
3311        let err = rt
3312            .block_on(engine.apply_jump(
3313                &jump_ctx("flow.source"),
3314                &mut state,
3315                JumpControl {
3316                    flow: "flow.missing".into(),
3317                    node: None,
3318                    payload: json!({ "ok": true }),
3319                    hints: Value::Null,
3320                    max_redirects: None,
3321                    reason: None,
3322                },
3323            ))
3324            .unwrap_err();
3325        assert!(
3326            err.to_string().contains("unknown_flow"),
3327            "unexpected error: {err}"
3328        );
3329    }
3330
3331    #[test]
3332    fn apply_jump_unknown_node_errors() {
3333        let engine = jump_test_engine();
3334        let mut state = ExecutionState::new(Value::Null);
3335        let rt = Runtime::new().unwrap();
3336        let err = rt
3337            .block_on(engine.apply_jump(
3338                &jump_ctx("flow.source"),
3339                &mut state,
3340                JumpControl {
3341                    flow: "flow.target".into(),
3342                    node: Some("node-missing".into()),
3343                    payload: json!({ "ok": true }),
3344                    hints: Value::Null,
3345                    max_redirects: None,
3346                    reason: None,
3347                },
3348            ))
3349            .unwrap_err();
3350        assert!(
3351            err.to_string().contains("unknown_node"),
3352            "unexpected error: {err}"
3353        );
3354    }
3355
3356    #[test]
3357    fn apply_jump_uses_default_start_fallback() {
3358        let engine = jump_test_engine();
3359        let mut state = ExecutionState::new(Value::Null);
3360        let rt = Runtime::new().unwrap();
3361        let target = rt
3362            .block_on(engine.apply_jump(
3363                &jump_ctx("flow.source"),
3364                &mut state,
3365                JumpControl {
3366                    flow: "flow.target".into(),
3367                    node: None,
3368                    payload: json!({ "k": "v" }),
3369                    hints: Value::Null,
3370                    max_redirects: None,
3371                    reason: None,
3372                },
3373            ))
3374            .expect("jump target");
3375        assert_eq!(target.flow_id, "flow.target");
3376        assert_eq!(target.node_id.as_str(), "node-a");
3377    }
3378
3379    #[test]
3380    fn apply_jump_redirect_limit_enforced() {
3381        let engine = jump_test_engine();
3382        let mut state = ExecutionState::new(Value::Null);
3383        state.redirect_count = 3;
3384        let rt = Runtime::new().unwrap();
3385        let err = rt
3386            .block_on(engine.apply_jump(
3387                &jump_ctx("flow.source"),
3388                &mut state,
3389                JumpControl {
3390                    flow: "flow.target".into(),
3391                    node: None,
3392                    payload: json!({ "k": "v" }),
3393                    hints: Value::Null,
3394                    max_redirects: Some(3),
3395                    reason: None,
3396                },
3397            ))
3398            .unwrap_err();
3399        assert_eq!(err.to_string(), "redirect_limit");
3400    }
3401
3402    /// Regression: a `Routing::Custom` array containing at least one
3403    /// conditional entry must pause (return `Wait`) when no condition
3404    /// matches, instead of terminating. Concrete bug it guards against:
3405    /// every card click used to terminate the flow because the entry-card's
3406    /// routing array didn't enumerate every downstream action, so users got
3407    /// looped back to the entry on every interaction.
3408    #[test]
3409    fn evaluate_custom_routing_waits_when_conditional_falls_through() {
3410        let raw_routing = json!([
3411            { "condition": "response.action == \"go\"", "to": "next" },
3412            { "out": true }
3413        ]);
3414        let flow_ir = HostFlow {
3415            id: "flow.test".to_string(),
3416            start: None,
3417            nodes: IndexMap::new(),
3418            slot_schema: None,
3419        };
3420        let current_node = NodeId::from_str("current").unwrap();
3421        let output = NodeOutput::new(Value::Null);
3422
3423        // First case: empty action -> conditional does not match, must wait.
3424        let mut state_empty = ExecutionState::new(json!({ "metadata": { "action": "" } }));
3425        state_empty.entry = json!({ "metadata": { "action": "" } });
3426        let decision_empty =
3427            evaluate_custom_routing(&raw_routing, &output, &state_empty, &flow_ir, &current_node);
3428        assert!(
3429            matches!(decision_empty, CustomRoutingDecision::Wait),
3430            "expected Wait on conditional fall-through, got {decision_empty:?}"
3431        );
3432
3433        // Second case: action == "go" -> conditional matches, must advance.
3434        let mut state_go = ExecutionState::new(json!({ "metadata": { "action": "go" } }));
3435        state_go.entry = json!({ "metadata": { "action": "go" } });
3436        let decision_go =
3437            evaluate_custom_routing(&raw_routing, &output, &state_go, &flow_ir, &current_node);
3438        match decision_go {
3439            CustomRoutingDecision::Next(nid) => assert_eq!(nid.as_str(), "next"),
3440            other => panic!("expected Next(\"next\"), got {other:?}"),
3441        }
3442    }
3443
3444    #[test]
3445    fn node_output_with_error_marks_ok_false_and_stashes_in_meta() {
3446        let err: Box<dyn std::error::Error + 'static> =
3447            Box::<dyn std::error::Error + 'static>::from("weatherapi returned 401 Unauthorized");
3448        let out = NodeOutput::with_error("call_weather", err.as_ref());
3449        assert!(!out.ok);
3450        assert_eq!(out.payload, Value::Null);
3451        assert_eq!(out.meta["error"]["kind"], "flow_node_failed");
3452        assert_eq!(out.meta["error"]["node_id"], "call_weather");
3453        assert_eq!(
3454            out.meta["error"]["message"],
3455            "weatherapi returned 401 Unauthorized"
3456        );
3457    }
3458
3459    #[test]
3460    fn lift_first_node_error_promotes_node_meta_to_output_metadata() {
3461        // Two nodes ran; the first failed, the second produced a default-
3462        // looking output (flow author wrote no error routing). The executor
3463        // must lift the first failure into output.metadata so the messaging
3464        // provider renders the error card without any flow-author changes.
3465        let mut nodes: HashMap<String, NodeOutput> = HashMap::new();
3466        let err: Box<dyn std::error::Error + 'static> =
3467            Box::<dyn std::error::Error + 'static>::from("weatherapi returned 401 Unauthorized");
3468        nodes.insert(
3469            "call_weather".to_string(),
3470            NodeOutput::with_error("call_weather", err.as_ref()),
3471        );
3472        nodes.insert(
3473            "render_current_card".to_string(),
3474            NodeOutput::new(json!({ "text": "message" })),
3475        );
3476
3477        let final_output = json!({ "text": "message" });
3478        let enriched = lift_first_node_error_from_nodes(final_output, &nodes);
3479        assert_eq!(
3480            enriched["metadata"]["error_kind"], "flow_node_failed",
3481            "first failing node's kind must be lifted"
3482        );
3483        assert_eq!(
3484            enriched["metadata"]["error_message"],
3485            "weatherapi returned 401 Unauthorized"
3486        );
3487        assert_eq!(enriched["metadata"]["node_id"], "call_weather");
3488        // Preserves the original payload bits so downstream renderers still
3489        // see what the flow produced.
3490        assert_eq!(enriched["text"], "message");
3491    }
3492
3493    #[test]
3494    fn lift_first_node_error_is_noop_when_all_nodes_ok() {
3495        let mut nodes: HashMap<String, NodeOutput> = HashMap::new();
3496        nodes.insert(
3497            "ok_node".to_string(),
3498            NodeOutput::new(json!({ "text": "all good" })),
3499        );
3500        let output = json!({ "text": "all good" });
3501        let lifted = lift_first_node_error_from_nodes(output.clone(), &nodes);
3502        assert_eq!(lifted, output);
3503    }
3504
3505    #[tokio::test]
3506    async fn execute_user_facing_flow_failure_returns_completed_with_error_envelope() {
3507        // Flow whose start node is missing — drive_flow will return Err on
3508        // node lookup. With session_id present, execute() must convert that
3509        // to a Completed FlowExecution carrying error_kind/error_message in
3510        // output.metadata so the chat user sees the error card.
3511        let flow_id_str = "broken.flow";
3512        let pack_id_str = "test-pack";
3513        let host_flow = host_flow_for_test(flow_id_str, &["only-node"], Some("does-not-exist"));
3514        let engine = FlowEngine {
3515            packs: Vec::new(),
3516            flows: Vec::new(),
3517            flow_sources: HashMap::new(),
3518            flow_cache: RwLock::new(HashMap::from([(
3519                FlowKey {
3520                    pack_id: pack_id_str.to_string(),
3521                    flow_id: flow_id_str.to_string(),
3522                },
3523                host_flow,
3524            )])),
3525            default_env: "local".to_string(),
3526            validation: ValidationConfig {
3527                mode: ValidationMode::Off,
3528            },
3529            cross_pack_resolver: None,
3530            rollout_ids: RolloutIds::default(),
3531        };
3532        let ctx = FlowContext {
3533            tenant: "demo",
3534            pack_id: pack_id_str,
3535            flow_id: flow_id_str,
3536            node_id: None,
3537            tool: None,
3538            action: None,
3539            session_id: Some("conv-1"),
3540            provider_id: None,
3541            retry_config: RetryConfig {
3542                max_attempts: 1,
3543                base_delay_ms: 1,
3544            },
3545            attempt: 1,
3546            observer: None,
3547            mocks: None,
3548        };
3549        let result = engine
3550            .execute(ctx, Value::Null)
3551            .await
3552            .expect("must not propagate Err");
3553        assert!(matches!(result.status, FlowStatus::Completed));
3554        assert_eq!(
3555            result.output["metadata"]["error_kind"],
3556            "flow_execution_failed"
3557        );
3558        let msg = result.output["metadata"]["error_message"]
3559            .as_str()
3560            .unwrap_or("");
3561        assert!(!msg.is_empty(), "error_message must be populated");
3562        assert_eq!(result.output["metadata"]["flow_id"], "broken.flow");
3563    }
3564
3565    #[test]
3566    fn mcp_tool_error_recognises_generator_error_shape() {
3567        // greentic-mcp-generator's tool_error_with_status emits this exact
3568        // shape when the upstream HTTP call to weatherapi.com returns 401.
3569        let value = json!({
3570            "error": {
3571                "code": "tool_error",
3572                "message": "API request returned status 401",
3573                "status": 401
3574            }
3575        });
3576        let (code, message) = mcp_tool_error(&value).expect("must detect MCP error shape");
3577        assert_eq!(code, "tool_error");
3578        assert!(message.contains("API request returned status 401"));
3579        assert!(message.contains("(status 401)"));
3580    }
3581
3582    #[test]
3583    fn mcp_tool_error_skips_success_responses() {
3584        // A success response uses `result`, not `error`.
3585        let value = json!({ "result": { "current": { "temp_c": 19.0 } } });
3586        assert!(mcp_tool_error(&value).is_none());
3587    }
3588
3589    #[test]
3590    fn mcp_tool_error_skips_non_object_and_unrelated_shapes() {
3591        assert!(mcp_tool_error(&Value::Null).is_none());
3592        assert!(mcp_tool_error(&json!({"unrelated": true})).is_none());
3593        // `error` must be an object; a string isn't enough.
3594        assert!(mcp_tool_error(&json!({"error": "oops"})).is_none());
3595    }
3596
3597    #[tokio::test]
3598    async fn execute_non_user_facing_flow_failure_still_propagates() {
3599        // No session_id => internal job. Errors still propagate as Err so
3600        // operator alerting / metrics pipelines stay intact.
3601        let flow_id_str = "broken.flow";
3602        let pack_id_str = "test-pack";
3603        let host_flow = host_flow_for_test(flow_id_str, &["only-node"], Some("does-not-exist"));
3604        let engine = FlowEngine {
3605            packs: Vec::new(),
3606            flows: Vec::new(),
3607            flow_sources: HashMap::new(),
3608            flow_cache: RwLock::new(HashMap::from([(
3609                FlowKey {
3610                    pack_id: pack_id_str.to_string(),
3611                    flow_id: flow_id_str.to_string(),
3612                },
3613                host_flow,
3614            )])),
3615            default_env: "local".to_string(),
3616            validation: ValidationConfig {
3617                mode: ValidationMode::Off,
3618            },
3619            cross_pack_resolver: None,
3620            rollout_ids: RolloutIds::default(),
3621        };
3622        let ctx = FlowContext {
3623            tenant: "demo",
3624            pack_id: pack_id_str,
3625            flow_id: flow_id_str,
3626            node_id: None,
3627            tool: None,
3628            action: None,
3629            session_id: None,
3630            provider_id: None,
3631            retry_config: RetryConfig {
3632                max_attempts: 1,
3633                base_delay_ms: 1,
3634            },
3635            attempt: 1,
3636            observer: None,
3637            mocks: None,
3638        };
3639        let result = engine.execute(ctx, Value::Null).await;
3640        assert!(result.is_err(), "non-user-facing flow must propagate Err");
3641    }
3642
3643    // ---- Phase D: slot_schema injection tests ----
3644
3645    #[test]
3646    fn host_flow_extracts_slot_schema_from_metadata_extra() {
3647        use greentic_types::FlowMetadata;
3648        use std::collections::BTreeSet;
3649
3650        let schema = json!([
3651            {"name": "counterparty", "slot_type": "string", "required": true},
3652            {"name": "due_date", "slot_type": "date", "required": true}
3653        ]);
3654        let flow = Flow {
3655            schema_version: "flow-v1".into(),
3656            id: FlowId::from_str("test.flow").unwrap(),
3657            kind: FlowKind::Messaging,
3658            entrypoints: BTreeMap::new(),
3659            nodes: IndexMap::default(),
3660            metadata: FlowMetadata {
3661                title: None,
3662                description: None,
3663                tags: BTreeSet::new(),
3664                extra: json!({(SLOT_SCHEMA_METADATA_KEY): schema}),
3665            },
3666        };
3667        let host = HostFlow::from(flow);
3668        assert_eq!(
3669            host.slot_schema.as_ref(),
3670            Some(&schema),
3671            "HostFlow must extract slot_schema from metadata.extra"
3672        );
3673    }
3674
3675    #[test]
3676    fn host_flow_slot_schema_is_none_when_absent() {
3677        let flow = Flow {
3678            schema_version: "flow-v1".into(),
3679            id: FlowId::from_str("test.flow").unwrap(),
3680            kind: FlowKind::Messaging,
3681            entrypoints: BTreeMap::new(),
3682            nodes: IndexMap::default(),
3683            metadata: Default::default(),
3684        };
3685        let host = HostFlow::from(flow);
3686        assert!(
3687            host.slot_schema.is_none(),
3688            "HostFlow.slot_schema must be None when metadata.extra has no greentic.slot_schema"
3689        );
3690    }
3691
3692    #[test]
3693    fn inject_slot_definitions_adds_to_object_input() {
3694        let schema = json!([
3695            {"name": "city", "slot_type": "string"}
3696        ]);
3697        let mut input = json!({"utterance": "hello"});
3698        inject_slot_definitions(&mut input, &schema, "f", "n");
3699        assert_eq!(
3700            input,
3701            json!({"utterance": "hello", "slot_definitions": schema}),
3702            "slot_definitions must be injected into existing object"
3703        );
3704    }
3705
3706    #[test]
3707    fn inject_slot_definitions_wraps_null_input() {
3708        let schema = json!([{"name": "x", "slot_type": "string"}]);
3709        let mut input = Value::Null;
3710        inject_slot_definitions(&mut input, &schema, "f", "n");
3711        assert_eq!(
3712            input,
3713            json!({"slot_definitions": schema}),
3714            "null input must become an object with slot_definitions"
3715        );
3716    }
3717
3718    #[test]
3719    fn inject_slot_definitions_preserves_explicit_inline() {
3720        let flow_schema = json!([{"name": "city", "slot_type": "string"}]);
3721        let inline_defs = json!([{"name": "country", "slot_type": "string"}]);
3722        let mut input = json!({
3723            "utterance": "hello",
3724            "slot_definitions": inline_defs
3725        });
3726        inject_slot_definitions(&mut input, &flow_schema, "f", "n");
3727        assert_eq!(
3728            input["slot_definitions"], inline_defs,
3729            "explicit inline slot_definitions must not be overwritten"
3730        );
3731    }
3732
3733    #[test]
3734    fn inject_slot_definitions_skips_non_object_input() {
3735        let schema = json!([{"name": "x", "slot_type": "string"}]);
3736        let mut input = json!("a string");
3737        inject_slot_definitions(&mut input, &schema, "f", "n");
3738        assert_eq!(
3739            input,
3740            json!("a string"),
3741            "non-object input must be left unchanged"
3742        );
3743    }
3744
3745    fn make_flow_doc_for_test(
3746        id: &str,
3747        node_name: &str,
3748        component: &str,
3749        slot_schema: Option<Value>,
3750    ) -> greentic_flow::model::FlowDoc {
3751        use greentic_flow::model::{FlowDoc, NodeDoc};
3752
3753        let mut nodes = IndexMap::new();
3754        nodes.insert(
3755            node_name.to_string(),
3756            NodeDoc {
3757                raw: {
3758                    let mut m = IndexMap::new();
3759                    m.insert(
3760                        "component.exec".to_string(),
3761                        json!({ "component": component }),
3762                    );
3763                    m
3764                },
3765                routing: json!([{ "out": true }]),
3766                ..Default::default()
3767            },
3768        );
3769
3770        FlowDoc {
3771            id: id.into(),
3772            title: None,
3773            description: None,
3774            flow_type: "messaging".into(),
3775            start: Some(node_name.into()),
3776            parameters: json!({}),
3777            tags: Vec::new(),
3778            schema_version: None,
3779            entrypoints: IndexMap::new(),
3780            meta: None,
3781            slot_schema,
3782            nodes,
3783        }
3784    }
3785
3786    /// Integration test: exercises the real `greentic_flow::compile_flow`
3787    /// producer path with a `FlowDoc` carrying `slot_schema`, then converts
3788    /// through `HostFlow::from` and verifies the runtime-side `slot_schema`
3789    /// field is populated — closing the gap Codex flagged where the existing
3790    /// unit tests constructed `FlowMetadata` directly.
3791    #[test]
3792    fn compile_flow_round_trips_slot_schema_into_host_flow() {
3793        let slot_defs = json!([
3794            { "name": "counterparty", "slot_type": "string", "required": true,
3795              "pattern": ".+" },
3796            { "name": "due_date", "slot_type": "date", "required": true,
3797              "pattern": "\\d{4}-\\d{2}-\\d{2}" }
3798        ]);
3799        let doc = make_flow_doc_for_test(
3800            "slot-test",
3801            "extractor",
3802            "slot-extractor",
3803            Some(slot_defs.clone()),
3804        );
3805
3806        let flow = greentic_flow::compile_flow(doc).expect("compile_flow must succeed");
3807        assert_eq!(
3808            flow.metadata.extra.get(SLOT_SCHEMA_METADATA_KEY),
3809            Some(&slot_defs),
3810            "compile_flow must forward slot_schema into metadata.extra"
3811        );
3812
3813        let host = HostFlow::from(flow);
3814        assert_eq!(
3815            host.slot_schema.as_ref(),
3816            Some(&slot_defs),
3817            "HostFlow.slot_schema must survive the compile_flow -> HostFlow round-trip"
3818        );
3819    }
3820
3821    /// Verify that `compile_flow` without `slot_schema` produces a `Flow`
3822    /// whose `metadata.extra` has no `greentic.slot_schema` key, and that
3823    /// `HostFlow.slot_schema` stays `None` through the real compile path.
3824    #[test]
3825    fn compile_flow_without_slot_schema_leaves_host_flow_none() {
3826        let doc = make_flow_doc_for_test("no-slots", "echo", "echo", None);
3827
3828        let flow = greentic_flow::compile_flow(doc).expect("compile_flow must succeed");
3829        assert!(
3830            flow.metadata.extra.get(SLOT_SCHEMA_METADATA_KEY).is_none(),
3831            "metadata.extra must not contain greentic.slot_schema when FlowDoc.slot_schema is None"
3832        );
3833
3834        let host = HostFlow::from(flow);
3835        assert!(
3836            host.slot_schema.is_none(),
3837            "HostFlow.slot_schema must be None when FlowDoc has no slot_schema"
3838        );
3839    }
3840}
3841
3842use tracing::Instrument;
3843
3844pub struct FlowContext<'a> {
3845    pub tenant: &'a str,
3846    pub pack_id: &'a str,
3847    pub flow_id: &'a str,
3848    pub node_id: Option<&'a str>,
3849    pub tool: Option<&'a str>,
3850    pub action: Option<&'a str>,
3851    pub session_id: Option<&'a str>,
3852    pub provider_id: Option<&'a str>,
3853    pub retry_config: RetryConfig,
3854    pub attempt: u32,
3855    pub observer: Option<&'a dyn ExecutionObserver>,
3856    pub mocks: Option<&'a MockLayer>,
3857}
3858
3859#[derive(Copy, Clone)]
3860pub struct RetryConfig {
3861    pub max_attempts: u32,
3862    pub base_delay_ms: u64,
3863}
3864
3865/// Look across all node outputs, find the first one that finished with
3866/// `ok=false`, and lift its `meta.error` fields into
3867/// `output.metadata.error_kind` / `.error_message` / `.node_id`. Returns the
3868/// (possibly enriched) output unchanged otherwise.
3869///
3870/// This is how the executor "shows" an unhandled flow-node failure to the
3871/// caller without the flow author having to add error routing: the chat-side
3872/// provider (messaging-providers `extract_error_envelope`) picks the lifted
3873/// fields off `output.metadata` and renders a styled error card.
3874///
3875/// Takes a borrow of the node-output map rather than the whole
3876/// `ExecutionState` because the callers have already consumed `state` via
3877/// `state.finalize_with(...)`; we capture a cheap clone of `state.nodes` up
3878/// front and pass it in here.
3879fn lift_first_node_error_from_nodes(output: Value, nodes: &HashMap<String, NodeOutput>) -> Value {
3880    let Some((node_id, failed)) = nodes.iter().find(|(_, out)| !out.ok) else {
3881        return output;
3882    };
3883    let err_meta = failed.meta.get("error");
3884    let message = err_meta
3885        .and_then(|e| e.get("message"))
3886        .and_then(|v| v.as_str())
3887        .unwrap_or("flow node failed");
3888    let kind = err_meta
3889        .and_then(|e| e.get("kind"))
3890        .and_then(|v| v.as_str())
3891        .unwrap_or("flow_node_failed");
3892
3893    let mut output = match output {
3894        Value::Object(map) => map,
3895        Value::Null => JsonMap::new(),
3896        other => {
3897            let mut wrap = JsonMap::new();
3898            wrap.insert("payload".to_string(), other);
3899            wrap
3900        }
3901    };
3902    let metadata_entry = output
3903        .entry("metadata".to_string())
3904        .or_insert_with(|| Value::Object(JsonMap::new()));
3905    let metadata_map = match metadata_entry {
3906        Value::Object(map) => map,
3907        _ => {
3908            *metadata_entry = Value::Object(JsonMap::new());
3909            metadata_entry.as_object_mut().unwrap()
3910        }
3911    };
3912    metadata_map
3913        .entry("error_kind".to_string())
3914        .or_insert(Value::String(kind.to_string()));
3915    metadata_map
3916        .entry("error_message".to_string())
3917        .or_insert(Value::String(message.to_string()));
3918    metadata_map
3919        .entry("node_id".to_string())
3920        .or_insert(Value::String(node_id.clone()));
3921    Value::Object(output)
3922}
3923
3924fn should_retry(err: &anyhow::Error) -> bool {
3925    let lower = err.to_string().to_lowercase();
3926    lower.contains("transient")
3927        || lower.contains("unavailable")
3928        || lower.contains("internal")
3929        || lower.contains("timeout")
3930}
3931
3932impl From<FlowRetryConfig> for RetryConfig {
3933    fn from(value: FlowRetryConfig) -> Self {
3934        Self {
3935            max_attempts: value.max_attempts.max(1),
3936            base_delay_ms: value.base_delay_ms.max(50),
3937        }
3938    }
3939}