Skip to main content

greentic_runner_host/runner/
engine.rs

1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
9use anyhow::{Context, Result, anyhow, bail};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use super::templating::{TemplateOptions, render_template_value};
18use crate::config::{FlowRetryConfig, HostConfig};
19use crate::pack::{FlowDescriptor, PackRuntime};
20use crate::runner::invocation::{InvocationMeta, build_invocation_envelope};
21use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
22#[cfg(feature = "fault-injection")]
23use crate::testing::fault_injection::{FaultContext, FaultPoint, maybe_fail};
24use crate::validate::{
25    ValidationConfig, ValidationIssue, ValidationMode, validate_component_envelope,
26    validate_tool_envelope,
27};
28use greentic_types::{Flow, Node, NodeId, Routing};
29
30pub struct FlowEngine {
31    packs: Vec<Arc<PackRuntime>>,
32    flows: Vec<FlowDescriptor>,
33    flow_sources: HashMap<FlowKey, usize>,
34    flow_cache: RwLock<HashMap<FlowKey, HostFlow>>,
35    default_env: String,
36    validation: ValidationConfig,
37}
38
39#[derive(Clone, Debug, PartialEq, Eq, Hash)]
40struct FlowKey {
41    pack_id: String,
42    flow_id: String,
43}
44
45#[derive(Clone, Debug, Serialize, Deserialize)]
46pub struct FlowSnapshot {
47    pub pack_id: String,
48    pub flow_id: String,
49    #[serde(default, skip_serializing_if = "Option::is_none")]
50    pub next_flow: Option<String>,
51    pub next_node: String,
52    pub state: ExecutionState,
53}
54
55#[derive(Clone, Debug)]
56pub struct FlowWait {
57    pub reason: Option<String>,
58    pub snapshot: FlowSnapshot,
59}
60
61#[derive(Clone, Debug)]
62pub enum FlowStatus {
63    Completed,
64    Waiting(Box<FlowWait>),
65}
66
67#[derive(Clone, Debug)]
68pub struct FlowExecution {
69    pub output: Value,
70    pub status: FlowStatus,
71}
72
73#[derive(Clone, Debug)]
74struct HostFlow {
75    id: String,
76    start: Option<NodeId>,
77    nodes: IndexMap<NodeId, HostNode>,
78}
79
80#[derive(Clone, Debug)]
81pub struct HostNode {
82    kind: NodeKind,
83    /// Backwards-compatible component label for observers/transcript.
84    pub component: String,
85    component_id: String,
86    operation_name: Option<String>,
87    operation_in_mapping: Option<String>,
88    payload_expr: Value,
89    routing: Routing,
90}
91
92impl HostNode {
93    pub fn component_id(&self) -> &str {
94        &self.component_id
95    }
96
97    pub fn operation_name(&self) -> Option<&str> {
98        self.operation_name.as_deref()
99    }
100
101    pub fn operation_in_mapping(&self) -> Option<&str> {
102        self.operation_in_mapping.as_deref()
103    }
104}
105
106#[derive(Clone, Debug)]
107enum NodeKind {
108    Exec { target_component: String },
109    PackComponent { component_ref: String },
110    ProviderInvoke,
111    FlowCall,
112    BuiltinEmit { kind: EmitKind },
113    Wait,
114}
115
116#[derive(Clone, Debug)]
117enum EmitKind {
118    Log,
119    Response,
120    Other(String),
121}
122
123struct ComponentOverrides<'a> {
124    component: Option<&'a str>,
125    operation: Option<&'a str>,
126}
127
128struct ComponentCall {
129    component_ref: String,
130    operation: String,
131    input: Value,
132    config: Value,
133}
134
135impl FlowExecution {
136    fn completed(output: Value) -> Self {
137        Self {
138            output,
139            status: FlowStatus::Completed,
140        }
141    }
142
143    fn waiting(output: Value, wait: FlowWait) -> Self {
144        Self {
145            output,
146            status: FlowStatus::Waiting(Box::new(wait)),
147        }
148    }
149}
150
151impl FlowEngine {
152    pub async fn new(packs: Vec<Arc<PackRuntime>>, config: Arc<HostConfig>) -> Result<Self> {
153        let mut flow_sources: HashMap<FlowKey, usize> = HashMap::new();
154        let mut descriptors = Vec::new();
155        let mut bindings = HashMap::new();
156        for pack in &config.pack_bindings {
157            bindings.insert(pack.pack_id.clone(), pack.flows.clone());
158        }
159        let enforce_bindings = !bindings.is_empty();
160        for (idx, pack) in packs.iter().enumerate() {
161            let pack_id = pack.metadata().pack_id.clone();
162            if enforce_bindings && !bindings.contains_key(&pack_id) {
163                bail!("no gtbind entries found for pack {}", pack_id);
164            }
165            let flows = pack.list_flows().await?;
166            let allowed = bindings.get(&pack_id).map(|flows| {
167                flows
168                    .iter()
169                    .cloned()
170                    .collect::<std::collections::HashSet<_>>()
171            });
172            let mut seen = std::collections::HashSet::new();
173            for flow in flows {
174                if let Some(ref allow) = allowed
175                    && !allow.contains(&flow.id)
176                {
177                    continue;
178                }
179                seen.insert(flow.id.clone());
180                tracing::info!(
181                    flow_id = %flow.id,
182                    flow_type = %flow.flow_type,
183                    pack_id = %flow.pack_id,
184                    pack_index = idx,
185                    "registered flow"
186                );
187                flow_sources.insert(
188                    FlowKey {
189                        pack_id: flow.pack_id.clone(),
190                        flow_id: flow.id.clone(),
191                    },
192                    idx,
193                );
194                descriptors.retain(|existing: &FlowDescriptor| {
195                    !(existing.id == flow.id && existing.pack_id == flow.pack_id)
196                });
197                descriptors.push(flow);
198            }
199            if let Some(allow) = allowed {
200                let missing = allow.difference(&seen).cloned().collect::<Vec<_>>();
201                if !missing.is_empty() {
202                    bail!(
203                        "gtbind flow ids missing in pack {}: {}",
204                        pack_id,
205                        missing.join(", ")
206                    );
207                }
208            }
209        }
210
211        let mut flow_map = HashMap::new();
212        for flow in &descriptors {
213            let pack_id = flow.pack_id.clone();
214            if let Some(&pack_idx) = flow_sources.get(&FlowKey {
215                pack_id: pack_id.clone(),
216                flow_id: flow.id.clone(),
217            }) {
218                let pack_clone = Arc::clone(&packs[pack_idx]);
219                let flow_id = flow.id.clone();
220                let task_flow_id = flow_id.clone();
221                match task::spawn_blocking(move || pack_clone.load_flow(&task_flow_id)).await {
222                    Ok(Ok(loaded_flow)) => {
223                        flow_map.insert(
224                            FlowKey {
225                                pack_id: pack_id.clone(),
226                                flow_id,
227                            },
228                            HostFlow::from(loaded_flow),
229                        );
230                    }
231                    Ok(Err(err)) => {
232                        tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
233                    }
234                    Err(err) => {
235                        tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
236                    }
237                }
238            }
239        }
240
241        Ok(Self {
242            packs,
243            flows: descriptors,
244            flow_sources,
245            flow_cache: RwLock::new(flow_map),
246            default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
247            validation: config.validation.clone(),
248        })
249    }
250
251    async fn get_or_load_flow(&self, pack_id: &str, flow_id: &str) -> Result<HostFlow> {
252        let key = FlowKey {
253            pack_id: pack_id.to_string(),
254            flow_id: flow_id.to_string(),
255        };
256        if let Some(flow) = self.flow_cache.read().get(&key).cloned() {
257            return Ok(flow);
258        }
259
260        let pack_idx = *self
261            .flow_sources
262            .get(&key)
263            .with_context(|| format!("flow {pack_id}:{flow_id} not registered"))?;
264        let pack = Arc::clone(&self.packs[pack_idx]);
265        let flow_id_owned = flow_id.to_string();
266        let task_flow_id = flow_id_owned.clone();
267        let flow = task::spawn_blocking(move || pack.load_flow(&task_flow_id))
268            .await
269            .context("failed to join flow metadata task")??;
270        let host_flow = HostFlow::from(flow);
271        self.flow_cache.write().insert(
272            FlowKey {
273                pack_id: pack_id.to_string(),
274                flow_id: flow_id_owned.clone(),
275            },
276            host_flow.clone(),
277        );
278        Ok(host_flow)
279    }
280
281    pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
282        let span = tracing::info_span!(
283            "flow.execute",
284            tenant = tracing::field::Empty,
285            flow_id = tracing::field::Empty,
286            node_id = tracing::field::Empty,
287            tool = tracing::field::Empty,
288            action = tracing::field::Empty
289        );
290        annotate_span(
291            &span,
292            &FlowSpanAttributes {
293                tenant: ctx.tenant,
294                flow_id: ctx.flow_id,
295                node_id: ctx.node_id,
296                tool: ctx.tool,
297                action: ctx.action,
298            },
299        );
300        set_flow_context(
301            &self.default_env,
302            ctx.tenant,
303            ctx.flow_id,
304            ctx.node_id,
305            ctx.provider_id,
306            ctx.session_id,
307        );
308        let retry_config = ctx.retry_config;
309        let original_input = input;
310        let mut ctx = ctx;
311        async move {
312            let mut attempt = 0u32;
313            loop {
314                attempt += 1;
315                ctx.attempt = attempt;
316                #[cfg(feature = "fault-injection")]
317                {
318                    let fault_ctx = FaultContext {
319                        pack_id: ctx.pack_id,
320                        flow_id: ctx.flow_id,
321                        node_id: ctx.node_id,
322                        attempt: ctx.attempt,
323                    };
324                    maybe_fail(FaultPoint::Timeout, fault_ctx)
325                        .map_err(|err| anyhow!(err.to_string()))?;
326                }
327                match self.execute_once(&ctx, original_input.clone()).await {
328                    Ok(value) => return Ok(value),
329                    Err(err) => {
330                        if attempt >= retry_config.max_attempts || !should_retry(&err) {
331                            return Err(err);
332                        }
333                        let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
334                        tracing::warn!(
335                            tenant = ctx.tenant,
336                            flow_id = ctx.flow_id,
337                            attempt,
338                            max_attempts = retry_config.max_attempts,
339                            delay_ms = delay,
340                            error = %err,
341                            "transient flow execution failure, backing off"
342                        );
343                        tokio::time::sleep(Duration::from_millis(delay)).await;
344                    }
345                }
346            }
347        }
348        .instrument(span)
349        .await
350    }
351
352    pub async fn resume(
353        &self,
354        ctx: FlowContext<'_>,
355        snapshot: FlowSnapshot,
356        input: Value,
357    ) -> Result<FlowExecution> {
358        if snapshot.pack_id != ctx.pack_id {
359            bail!(
360                "snapshot pack {} does not match requested {}",
361                snapshot.pack_id,
362                ctx.pack_id
363            );
364        }
365        let resume_flow = snapshot
366            .next_flow
367            .clone()
368            .unwrap_or_else(|| snapshot.flow_id.clone());
369        let flow_ir = self.get_or_load_flow(ctx.pack_id, &resume_flow).await?;
370        let mut state = snapshot.state;
371        state.replace_input(input);
372        state.ensure_entry();
373        self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node), resume_flow)
374            .await
375    }
376
377    async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
378        let flow_ir = self.get_or_load_flow(ctx.pack_id, ctx.flow_id).await?;
379        let state = ExecutionState::new(input);
380        self.drive_flow(ctx, flow_ir, state, None, ctx.flow_id.to_string())
381            .await
382    }
383
384    async fn drive_flow(
385        &self,
386        ctx: &FlowContext<'_>,
387        mut flow_ir: HostFlow,
388        mut state: ExecutionState,
389        resume_from: Option<String>,
390        mut current_flow_id: String,
391    ) -> Result<FlowExecution> {
392        let mut current = match resume_from {
393            Some(node) => NodeId::from_str(&node)
394                .with_context(|| format!("invalid resume node id `{node}`"))?,
395            None => flow_ir
396                .start
397                .clone()
398                .or_else(|| flow_ir.nodes.keys().next().cloned())
399                .with_context(|| format!("flow {} has no start node", flow_ir.id))?,
400        };
401
402        loop {
403            let step_ctx = FlowContext {
404                tenant: ctx.tenant,
405                pack_id: ctx.pack_id,
406                flow_id: current_flow_id.as_str(),
407                node_id: ctx.node_id,
408                tool: ctx.tool,
409                action: ctx.action,
410                session_id: ctx.session_id,
411                provider_id: ctx.provider_id,
412                retry_config: ctx.retry_config,
413                attempt: ctx.attempt,
414                observer: ctx.observer,
415                mocks: ctx.mocks,
416            };
417            let node = flow_ir
418                .nodes
419                .get(&current)
420                .with_context(|| format!("node {} not found", current.as_str()))?;
421
422            let payload_template = node.payload_expr.clone();
423            let prev = state
424                .last_output
425                .as_ref()
426                .cloned()
427                .unwrap_or_else(|| Value::Object(JsonMap::new()));
428            let ctx_value = template_context(&state, prev);
429            #[cfg(feature = "fault-injection")]
430            {
431                let fault_ctx = FaultContext {
432                    pack_id: ctx.pack_id,
433                    flow_id: ctx.flow_id,
434                    node_id: Some(current.as_str()),
435                    attempt: ctx.attempt,
436                };
437                maybe_fail(FaultPoint::TemplateRender, fault_ctx)
438                    .map_err(|err| anyhow!(err.to_string()))?;
439            }
440            let payload =
441                render_template_value(&payload_template, &ctx_value, TemplateOptions::default())
442                    .context("failed to render node input template")?;
443            let observed_payload = payload.clone();
444            let node_id = current.clone();
445            let event = NodeEvent {
446                context: &step_ctx,
447                node_id: node_id.as_str(),
448                node,
449                payload: &observed_payload,
450            };
451            if let Some(observer) = step_ctx.observer {
452                observer.on_node_start(&event);
453            }
454            let dispatch = self
455                .dispatch_node(
456                    &step_ctx,
457                    node_id.as_str(),
458                    node,
459                    &mut state,
460                    payload,
461                    &event,
462                )
463                .await;
464            let DispatchOutcome { output, control } = match dispatch {
465                Ok(outcome) => outcome,
466                Err(err) => {
467                    if let Some(observer) = step_ctx.observer {
468                        observer.on_node_error(&event, err.as_ref());
469                    }
470                    return Err(err);
471                }
472            };
473
474            state.nodes.insert(node_id.clone().into(), output.clone());
475            state.last_output = Some(output.payload.clone());
476            if let Some(observer) = step_ctx.observer {
477                observer.on_node_end(&event, &output.payload);
478            }
479
480            match control {
481                NodeControl::Continue => {
482                    let (next, should_exit) = match &node.routing {
483                        Routing::Next { node_id } => (Some(node_id.clone()), false),
484                        Routing::End | Routing::Reply => (None, true),
485                        Routing::Branch { default, .. } => (default.clone(), default.is_none()),
486                        Routing::Custom(raw) => {
487                            tracing::warn!(
488                                flow_id = %flow_ir.id,
489                                node_id = %node_id,
490                                routing = ?raw,
491                                "unsupported routing; terminating flow"
492                            );
493                            (None, true)
494                        }
495                    };
496
497                    if should_exit {
498                        return Ok(FlowExecution::completed(
499                            state.finalize_with(Some(output.payload.clone())),
500                        ));
501                    }
502
503                    match next {
504                        Some(n) => current = n,
505                        None => {
506                            return Ok(FlowExecution::completed(
507                                state.finalize_with(Some(output.payload.clone())),
508                            ));
509                        }
510                    }
511                }
512                NodeControl::Wait { reason } => {
513                    let (next, _) = match &node.routing {
514                        Routing::Next { node_id } => (Some(node_id.clone()), false),
515                        Routing::End | Routing::Reply => (None, true),
516                        Routing::Branch { default, .. } => (default.clone(), default.is_none()),
517                        Routing::Custom(raw) => {
518                            tracing::warn!(
519                                flow_id = %flow_ir.id,
520                                node_id = %node_id,
521                                routing = ?raw,
522                                "unsupported routing for wait; terminating flow"
523                            );
524                            (None, true)
525                        }
526                    };
527                    let resume_target = next.ok_or_else(|| {
528                        anyhow!(
529                            "session.wait node {} requires a non-empty route",
530                            current.as_str()
531                        )
532                    })?;
533                    let mut snapshot_state = state.clone();
534                    snapshot_state.clear_egress();
535                    let snapshot = FlowSnapshot {
536                        pack_id: step_ctx.pack_id.to_string(),
537                        flow_id: step_ctx.flow_id.to_string(),
538                        next_flow: (current_flow_id != step_ctx.flow_id)
539                            .then_some(current_flow_id.clone()),
540                        next_node: resume_target.as_str().to_string(),
541                        state: snapshot_state,
542                    };
543                    let output_value = state.clone().finalize_with(None);
544                    return Ok(FlowExecution::waiting(
545                        output_value,
546                        FlowWait { reason, snapshot },
547                    ));
548                }
549                NodeControl::Jump(jump) => {
550                    let jump_target = self.apply_jump(&step_ctx, &mut state, jump).await?;
551                    flow_ir = jump_target.flow;
552                    current_flow_id = jump_target.flow_id;
553                    current = jump_target.node_id;
554                }
555                NodeControl::Respond {
556                    text,
557                    card_cbor,
558                    needs_user,
559                } => {
560                    let response = json!({
561                        "text": text,
562                        "card_cbor": card_cbor,
563                        "needs_user": needs_user,
564                    });
565                    state.push_egress(response);
566                    return Ok(FlowExecution::completed(state.finalize_with(None)));
567                }
568            }
569        }
570    }
571
572    async fn dispatch_node(
573        &self,
574        ctx: &FlowContext<'_>,
575        node_id: &str,
576        node: &HostNode,
577        state: &mut ExecutionState,
578        payload: Value,
579        event: &NodeEvent<'_>,
580    ) -> Result<DispatchOutcome> {
581        match &node.kind {
582            NodeKind::Exec { target_component } => self
583                .execute_component_exec(
584                    ctx,
585                    node_id,
586                    node,
587                    payload,
588                    event,
589                    ComponentOverrides {
590                        component: Some(target_component.as_str()),
591                        operation: node.operation_name.as_deref(),
592                    },
593                )
594                .await
595                .and_then(component_dispatch_outcome),
596            NodeKind::PackComponent { component_ref } => self
597                .execute_component_call(ctx, node_id, node, payload, component_ref.as_str(), event)
598                .await
599                .and_then(component_dispatch_outcome),
600            NodeKind::FlowCall => self
601                .execute_flow_call(ctx, payload)
602                .await
603                .map(DispatchOutcome::complete),
604            NodeKind::ProviderInvoke => self
605                .execute_provider_invoke(ctx, node_id, state, payload, event)
606                .await
607                .map(DispatchOutcome::complete),
608            NodeKind::BuiltinEmit { kind } => {
609                match kind {
610                    EmitKind::Log | EmitKind::Response => {}
611                    EmitKind::Other(component) => {
612                        tracing::debug!(%component, "handling emit.* as builtin");
613                    }
614                }
615                state.push_egress(payload.clone());
616                Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
617            }
618            NodeKind::Wait => {
619                let reason = extract_wait_reason(&payload);
620                Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
621            }
622        }
623    }
624
625    async fn apply_jump(
626        &self,
627        ctx: &FlowContext<'_>,
628        state: &mut ExecutionState,
629        jump: JumpControl,
630    ) -> Result<JumpTarget> {
631        let target_flow = jump.flow.trim();
632        if target_flow.is_empty() {
633            bail!("missing_flow");
634        }
635
636        let flow = self
637            .get_or_load_flow(ctx.pack_id, target_flow)
638            .await
639            .with_context(|| format!("unknown_flow:{target_flow}"))?;
640
641        let target_node = if let Some(node) = jump.node.as_deref() {
642            let parsed = NodeId::from_str(node).with_context(|| format!("unknown_node:{node}"))?;
643            if !flow.nodes.contains_key(&parsed) {
644                bail!("unknown_node:{node}");
645            }
646            parsed
647        } else {
648            flow.start
649                .clone()
650                .or_else(|| flow.nodes.keys().next().cloned())
651                .ok_or_else(|| anyhow!("jump_failed: flow {target_flow} has no start node"))?
652        };
653
654        let max_redirects = jump.max_redirects.unwrap_or(3);
655        if state.redirect_count() >= max_redirects {
656            bail!("redirect_limit");
657        }
658        state.increment_redirect_count();
659        state.replace_input(jump.payload.clone());
660        state.last_output = Some(jump.payload);
661        tracing::info!(
662            flow_id = %ctx.flow_id,
663            target_flow = %target_flow,
664            target_node = %target_node.as_str(),
665            reason = ?jump.reason,
666            redirects = state.redirect_count(),
667            "flow.jump.applied"
668        );
669
670        Ok(JumpTarget {
671            flow_id: target_flow.to_string(),
672            flow,
673            node_id: target_node,
674        })
675    }
676
677    async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
678        #[derive(Deserialize)]
679        struct FlowCallPayload {
680            #[serde(alias = "flow")]
681            flow_id: String,
682            #[serde(default)]
683            input: Value,
684        }
685
686        let call: FlowCallPayload =
687            serde_json::from_value(payload).context("invalid payload for flow.call node")?;
688        if call.flow_id.trim().is_empty() {
689            bail!("flow.call requires a non-empty flow_id");
690        }
691
692        let sub_input = if call.input.is_null() {
693            Value::Null
694        } else {
695            call.input
696        };
697
698        let flow_id_owned = call.flow_id;
699        let action = "flow.call";
700        let sub_ctx = FlowContext {
701            tenant: ctx.tenant,
702            pack_id: ctx.pack_id,
703            flow_id: flow_id_owned.as_str(),
704            node_id: None,
705            tool: ctx.tool,
706            action: Some(action),
707            session_id: ctx.session_id,
708            provider_id: ctx.provider_id,
709            retry_config: ctx.retry_config,
710            attempt: ctx.attempt,
711            observer: ctx.observer,
712            mocks: ctx.mocks,
713        };
714
715        let execution = Box::pin(self.execute(sub_ctx, sub_input))
716            .await
717            .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
718        match execution.status {
719            FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
720            FlowStatus::Waiting(wait) => bail!(
721                "flow.call cannot pause (flow {} waiting {:?})",
722                flow_id_owned,
723                wait.reason
724            ),
725        }
726    }
727
728    async fn execute_component_exec(
729        &self,
730        ctx: &FlowContext<'_>,
731        node_id: &str,
732        node: &HostNode,
733        payload: Value,
734        event: &NodeEvent<'_>,
735        overrides: ComponentOverrides<'_>,
736    ) -> Result<NodeOutput> {
737        #[derive(Deserialize)]
738        struct ComponentPayload {
739            #[serde(default, alias = "component_ref", alias = "component")]
740            component: Option<String>,
741            #[serde(alias = "op")]
742            operation: Option<String>,
743            #[serde(default)]
744            input: Value,
745            #[serde(default)]
746            config: Value,
747        }
748
749        let payload: ComponentPayload =
750            serde_json::from_value(payload).context("invalid payload for component.exec")?;
751        let component_ref = overrides
752            .component
753            .map(str::to_string)
754            .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
755            .with_context(|| "component.exec requires a component_ref")?;
756        let operation = resolve_component_operation(
757            node_id,
758            node.component_id.as_str(),
759            payload.operation,
760            overrides.operation,
761            node.operation_in_mapping.as_deref(),
762        )?;
763        let call = ComponentCall {
764            component_ref,
765            operation,
766            input: payload.input,
767            config: payload.config,
768        };
769
770        self.invoke_component_call(ctx, node_id, call, event).await
771    }
772
773    async fn execute_component_call(
774        &self,
775        ctx: &FlowContext<'_>,
776        node_id: &str,
777        node: &HostNode,
778        payload: Value,
779        component_ref: &str,
780        event: &NodeEvent<'_>,
781    ) -> Result<NodeOutput> {
782        let payload_operation = extract_operation_from_mapping(&payload);
783        let (input, config) = split_operation_payload(payload);
784        let operation = resolve_component_operation(
785            node_id,
786            node.component_id.as_str(),
787            payload_operation,
788            node.operation_name.as_deref(),
789            node.operation_in_mapping.as_deref(),
790        )?;
791        let call = ComponentCall {
792            component_ref: component_ref.to_string(),
793            operation,
794            input,
795            config,
796        };
797        self.invoke_component_call(ctx, node_id, call, event).await
798    }
799
800    async fn invoke_component_call(
801        &self,
802        ctx: &FlowContext<'_>,
803        node_id: &str,
804        mut call: ComponentCall,
805        event: &NodeEvent<'_>,
806    ) -> Result<NodeOutput> {
807        self.validate_component(ctx, event, &call)?;
808        let key = FlowKey {
809            pack_id: ctx.pack_id.to_string(),
810            flow_id: ctx.flow_id.to_string(),
811        };
812        let pack_idx = *self.flow_sources.get(&key).with_context(|| {
813            format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
814        })?;
815        let pack = Arc::clone(&self.packs[pack_idx]);
816
817        // Pre-resolve card asset paths: read JSON files from the pack's assets
818        // directory and inject as inline_json so the component doesn't need
819        // WASI filesystem access.
820        resolve_card_assets(&mut call.input, &pack);
821
822        // When the input is a card-like invocation (has card_source/card_spec),
823        // pass it directly to the component instead of wrapping in an
824        // InvocationEnvelope.  The envelope serialises the payload field as a
825        // byte array which the component cannot decode back, and the
826        // InvocationPayload::parse heuristic strips domain fields when a
827        // `payload` key is present (e.g.  the card's Handlebars template
828        // context `payload: {}`).
829        let is_card = is_card_invocation(&call.input);
830
831        let input_json = if is_card {
832            serde_json::to_string(&call.input)?
833        } else {
834            // Runtime owns ctx; flows must not embed ctx, even if they provide envelopes.
835            let meta = InvocationMeta {
836                env: &self.default_env,
837                tenant: ctx.tenant,
838                flow_id: ctx.flow_id,
839                node_id: Some(node_id),
840                provider_id: ctx.provider_id,
841                session_id: ctx.session_id,
842                attempt: ctx.attempt,
843            };
844            let invocation_envelope =
845                build_invocation_envelope(meta, call.operation.as_str(), call.input)
846                    .context("build invocation envelope for component call")?;
847            serde_json::to_string(&invocation_envelope)?
848        };
849        let config_json = if call.config.is_null() {
850            None
851        } else {
852            Some(serde_json::to_string(&call.config)?)
853        };
854
855        let exec_ctx = component_exec_ctx(ctx, node_id);
856        #[cfg(feature = "fault-injection")]
857        {
858            let fault_ctx = FaultContext {
859                pack_id: ctx.pack_id,
860                flow_id: ctx.flow_id,
861                node_id: Some(node_id),
862                attempt: ctx.attempt,
863            };
864            maybe_fail(FaultPoint::BeforeComponentCall, fault_ctx)
865                .map_err(|err| anyhow!(err.to_string()))?;
866        }
867        let value = pack
868            .invoke_component(
869                call.component_ref.as_str(),
870                exec_ctx,
871                call.operation.as_str(),
872                config_json,
873                input_json,
874            )
875            .await?;
876        #[cfg(feature = "fault-injection")]
877        {
878            let fault_ctx = FaultContext {
879                pack_id: ctx.pack_id,
880                flow_id: ctx.flow_id,
881                node_id: Some(node_id),
882                attempt: ctx.attempt,
883            };
884            maybe_fail(FaultPoint::AfterComponentCall, fault_ctx)
885                .map_err(|err| anyhow!(err.to_string()))?;
886        }
887
888        if let Some((code, message)) = component_error(&value) {
889            bail!(
890                "component {} failed: {}: {}",
891                call.component_ref,
892                code,
893                message
894            );
895        }
896        Ok(NodeOutput::new(value))
897    }
898
899    async fn execute_provider_invoke(
900        &self,
901        ctx: &FlowContext<'_>,
902        node_id: &str,
903        state: &ExecutionState,
904        payload: Value,
905        event: &NodeEvent<'_>,
906    ) -> Result<NodeOutput> {
907        #[derive(Deserialize)]
908        struct ProviderPayload {
909            #[serde(default)]
910            provider_id: Option<String>,
911            #[serde(default)]
912            provider_type: Option<String>,
913            #[serde(default, alias = "operation")]
914            op: Option<String>,
915            #[serde(default)]
916            input: Value,
917            #[serde(default)]
918            in_map: Value,
919            #[serde(default)]
920            out_map: Value,
921            #[serde(default)]
922            err_map: Value,
923        }
924
925        let payload: ProviderPayload =
926            serde_json::from_value(payload).context("invalid payload for provider.invoke")?;
927        let op = payload
928            .op
929            .as_deref()
930            .filter(|v| !v.trim().is_empty())
931            .with_context(|| "provider.invoke requires an op")?
932            .to_string();
933
934        let prev = state
935            .last_output
936            .as_ref()
937            .cloned()
938            .unwrap_or_else(|| Value::Object(JsonMap::new()));
939        let base_ctx = template_context(state, prev);
940
941        let input_value = if !payload.in_map.is_null() {
942            let mut ctx_value = base_ctx.clone();
943            if let Value::Object(ref mut map) = ctx_value {
944                map.insert("input".into(), payload.input.clone());
945                map.insert("result".into(), payload.input.clone());
946            }
947            render_template_value(
948                &payload.in_map,
949                &ctx_value,
950                TemplateOptions {
951                    allow_pointer: true,
952                },
953            )
954            .context("failed to render provider.invoke in_map")?
955        } else if !payload.input.is_null() {
956            payload.input
957        } else {
958            Value::Null
959        };
960        let input_json = serde_json::to_vec(&input_value)?;
961
962        self.validate_tool(
963            ctx,
964            event,
965            payload.provider_id.as_deref(),
966            payload.provider_type.as_deref(),
967            &op,
968            &input_value,
969        )?;
970
971        let key = FlowKey {
972            pack_id: ctx.pack_id.to_string(),
973            flow_id: ctx.flow_id.to_string(),
974        };
975        let pack_idx = *self.flow_sources.get(&key).with_context(|| {
976            format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
977        })?;
978        let pack = Arc::clone(&self.packs[pack_idx]);
979        let binding = pack.resolve_provider(
980            payload.provider_id.as_deref(),
981            payload.provider_type.as_deref(),
982        )?;
983        let exec_ctx = component_exec_ctx(ctx, node_id);
984        #[cfg(feature = "fault-injection")]
985        {
986            let fault_ctx = FaultContext {
987                pack_id: ctx.pack_id,
988                flow_id: ctx.flow_id,
989                node_id: Some(node_id),
990                attempt: ctx.attempt,
991            };
992            maybe_fail(FaultPoint::BeforeToolCall, fault_ctx)
993                .map_err(|err| anyhow!(err.to_string()))?;
994        }
995        let result = pack
996            .invoke_provider(&binding, exec_ctx, &op, input_json)
997            .await?;
998        #[cfg(feature = "fault-injection")]
999        {
1000            let fault_ctx = FaultContext {
1001                pack_id: ctx.pack_id,
1002                flow_id: ctx.flow_id,
1003                node_id: Some(node_id),
1004                attempt: ctx.attempt,
1005            };
1006            maybe_fail(FaultPoint::AfterToolCall, fault_ctx)
1007                .map_err(|err| anyhow!(err.to_string()))?;
1008        }
1009
1010        let output = if payload.out_map.is_null() {
1011            result
1012        } else {
1013            let mut ctx_value = base_ctx;
1014            if let Value::Object(ref mut map) = ctx_value {
1015                map.insert("input".into(), result.clone());
1016                map.insert("result".into(), result.clone());
1017            }
1018            render_template_value(
1019                &payload.out_map,
1020                &ctx_value,
1021                TemplateOptions {
1022                    allow_pointer: true,
1023                },
1024            )
1025            .context("failed to render provider.invoke out_map")?
1026        };
1027        let _ = payload.err_map;
1028        Ok(NodeOutput::new(output))
1029    }
1030
1031    fn validate_component(
1032        &self,
1033        ctx: &FlowContext<'_>,
1034        event: &NodeEvent<'_>,
1035        call: &ComponentCall,
1036    ) -> Result<()> {
1037        if self.validation.mode == ValidationMode::Off {
1038            return Ok(());
1039        }
1040        let mut metadata = JsonMap::new();
1041        metadata.insert("tenant_id".to_string(), json!(ctx.tenant));
1042        if let Some(id) = ctx.session_id {
1043            metadata.insert("session".to_string(), json!({ "id": id }));
1044        }
1045        let envelope = json!({
1046            "component_id": call.component_ref,
1047            "operation": call.operation,
1048            "input": call.input,
1049            "config": call.config,
1050            "metadata": Value::Object(metadata),
1051        });
1052        let issues = validate_component_envelope(&envelope);
1053        self.report_validation(ctx, event, "component", issues)
1054    }
1055
1056    fn validate_tool(
1057        &self,
1058        ctx: &FlowContext<'_>,
1059        event: &NodeEvent<'_>,
1060        provider_id: Option<&str>,
1061        provider_type: Option<&str>,
1062        operation: &str,
1063        input: &Value,
1064    ) -> Result<()> {
1065        if self.validation.mode == ValidationMode::Off {
1066            return Ok(());
1067        }
1068        let tool_id = provider_id.or(provider_type).unwrap_or("provider.invoke");
1069        let mut metadata = JsonMap::new();
1070        metadata.insert("tenant_id".to_string(), json!(ctx.tenant));
1071        if let Some(id) = ctx.session_id {
1072            metadata.insert("session".to_string(), json!({ "id": id }));
1073        }
1074        let envelope = json!({
1075            "tool_id": tool_id,
1076            "operation": operation,
1077            "input": input,
1078            "metadata": Value::Object(metadata),
1079        });
1080        let issues = validate_tool_envelope(&envelope);
1081        self.report_validation(ctx, event, "tool", issues)
1082    }
1083
1084    fn report_validation(
1085        &self,
1086        ctx: &FlowContext<'_>,
1087        event: &NodeEvent<'_>,
1088        kind: &str,
1089        issues: Vec<ValidationIssue>,
1090    ) -> Result<()> {
1091        if issues.is_empty() {
1092            return Ok(());
1093        }
1094        if let Some(observer) = ctx.observer {
1095            observer.on_validation(event, &issues);
1096        }
1097        match self.validation.mode {
1098            ValidationMode::Warn => {
1099                tracing::warn!(
1100                    tenant = ctx.tenant,
1101                    flow_id = ctx.flow_id,
1102                    node_id = event.node_id,
1103                    kind,
1104                    issues = ?issues,
1105                    "invocation envelope validation issues"
1106                );
1107                Ok(())
1108            }
1109            ValidationMode::Error => {
1110                tracing::error!(
1111                    tenant = ctx.tenant,
1112                    flow_id = ctx.flow_id,
1113                    node_id = event.node_id,
1114                    kind,
1115                    issues = ?issues,
1116                    "invocation envelope validation failed"
1117                );
1118                bail!("invocation_validation_failed");
1119            }
1120            ValidationMode::Off => Ok(()),
1121        }
1122    }
1123
1124    pub fn flows(&self) -> &[FlowDescriptor] {
1125        &self.flows
1126    }
1127
1128    pub fn flow_by_key(&self, pack_id: &str, flow_id: &str) -> Option<&FlowDescriptor> {
1129        self.flows
1130            .iter()
1131            .find(|descriptor| descriptor.pack_id == pack_id && descriptor.id == flow_id)
1132    }
1133
1134    pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
1135        let mut matches = self
1136            .flows
1137            .iter()
1138            .filter(|descriptor| descriptor.flow_type == flow_type);
1139        let first = matches.next()?;
1140        if matches.next().is_some() {
1141            return None;
1142        }
1143        Some(first)
1144    }
1145
1146    pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
1147        let mut matches = self
1148            .flows
1149            .iter()
1150            .filter(|descriptor| descriptor.id == flow_id);
1151        let first = matches.next()?;
1152        if matches.next().is_some() {
1153            return None;
1154        }
1155        Some(first)
1156    }
1157}
1158
1159pub trait ExecutionObserver: Send + Sync {
1160    fn on_node_start(&self, event: &NodeEvent<'_>);
1161    fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
1162    fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
1163    fn on_validation(&self, _event: &NodeEvent<'_>, _issues: &[ValidationIssue]) {}
1164}
1165
1166pub struct NodeEvent<'a> {
1167    pub context: &'a FlowContext<'a>,
1168    pub node_id: &'a str,
1169    pub node: &'a HostNode,
1170    pub payload: &'a Value,
1171}
1172
1173#[derive(Clone, Debug, Serialize, Deserialize)]
1174pub struct ExecutionState {
1175    #[serde(default)]
1176    entry: Value,
1177    #[serde(default)]
1178    input: Value,
1179    #[serde(default)]
1180    nodes: HashMap<String, NodeOutput>,
1181    #[serde(default)]
1182    egress: Vec<Value>,
1183    #[serde(default, skip_serializing_if = "Option::is_none")]
1184    last_output: Option<Value>,
1185    #[serde(default)]
1186    redirect_count: u32,
1187}
1188
1189impl ExecutionState {
1190    fn new(input: Value) -> Self {
1191        Self {
1192            entry: input.clone(),
1193            input,
1194            nodes: HashMap::new(),
1195            egress: Vec::new(),
1196            last_output: None,
1197            redirect_count: 0,
1198        }
1199    }
1200
1201    fn ensure_entry(&mut self) {
1202        if self.entry.is_null() {
1203            self.entry = self.input.clone();
1204        }
1205    }
1206
1207    fn context(&self) -> Value {
1208        let mut nodes = JsonMap::new();
1209        for (id, output) in &self.nodes {
1210            nodes.insert(
1211                id.clone(),
1212                json!({
1213                    "ok": output.ok,
1214                    "payload": output.payload.clone(),
1215                    "meta": output.meta.clone(),
1216                }),
1217            );
1218        }
1219        json!({
1220            "entry": self.entry.clone(),
1221            "input": self.input.clone(),
1222            "nodes": nodes,
1223            "redirect_count": self.redirect_count,
1224        })
1225    }
1226
1227    fn outputs_map(&self) -> JsonMap<String, Value> {
1228        let mut outputs = JsonMap::new();
1229        for (id, output) in &self.nodes {
1230            outputs.insert(id.clone(), output.payload.clone());
1231        }
1232        outputs
1233    }
1234    fn push_egress(&mut self, payload: Value) {
1235        self.egress.push(payload);
1236    }
1237
1238    fn replace_input(&mut self, input: Value) {
1239        self.input = input;
1240    }
1241
1242    fn clear_egress(&mut self) {
1243        self.egress.clear();
1244    }
1245
1246    fn redirect_count(&self) -> u32 {
1247        self.redirect_count
1248    }
1249
1250    fn increment_redirect_count(&mut self) {
1251        self.redirect_count = self.redirect_count.saturating_add(1);
1252    }
1253
1254    fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
1255        if self.egress.is_empty() {
1256            return final_payload.unwrap_or(Value::Null);
1257        }
1258        let mut emitted = std::mem::take(&mut self.egress);
1259        if let Some(value) = final_payload {
1260            match value {
1261                Value::Null => {}
1262                Value::Array(items) => emitted.extend(items),
1263                other => emitted.push(other),
1264            }
1265        }
1266        Value::Array(emitted)
1267    }
1268}
1269
1270#[derive(Clone, Debug, Serialize, Deserialize)]
1271struct NodeOutput {
1272    ok: bool,
1273    payload: Value,
1274    meta: Value,
1275}
1276
1277impl NodeOutput {
1278    fn new(payload: Value) -> Self {
1279        Self {
1280            ok: true,
1281            payload,
1282            meta: Value::Null,
1283        }
1284    }
1285}
1286
1287struct DispatchOutcome {
1288    output: NodeOutput,
1289    control: NodeControl,
1290}
1291
1292impl DispatchOutcome {
1293    fn complete(output: NodeOutput) -> Self {
1294        Self {
1295            output,
1296            control: NodeControl::Continue,
1297        }
1298    }
1299
1300    fn wait(output: NodeOutput, reason: Option<String>) -> Self {
1301        Self {
1302            output,
1303            control: NodeControl::Wait { reason },
1304        }
1305    }
1306
1307    fn with_control(output: NodeOutput, control: NodeControl) -> Self {
1308        Self { output, control }
1309    }
1310}
1311
1312#[derive(Clone, Debug)]
1313enum NodeControl {
1314    Continue,
1315    Wait {
1316        reason: Option<String>,
1317    },
1318    Jump(JumpControl),
1319    Respond {
1320        text: Option<String>,
1321        card_cbor: Option<Vec<u8>>,
1322        needs_user: Option<bool>,
1323    },
1324}
1325
1326#[derive(Clone, Debug)]
1327struct JumpControl {
1328    flow: String,
1329    node: Option<String>,
1330    payload: Value,
1331    hints: Value,
1332    max_redirects: Option<u32>,
1333    reason: Option<String>,
1334}
1335
1336#[derive(Clone, Debug)]
1337struct JumpTarget {
1338    flow_id: String,
1339    flow: HostFlow,
1340    node_id: NodeId,
1341}
1342
1343impl NodeOutput {
1344    fn with_meta(payload: Value, meta: Value) -> Self {
1345        Self {
1346            ok: true,
1347            payload,
1348            meta,
1349        }
1350    }
1351}
1352
1353fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
1354    ComponentExecCtx {
1355        tenant: ComponentTenantCtx {
1356            tenant: ctx.tenant.to_string(),
1357            team: None,
1358            user: ctx.provider_id.map(str::to_string),
1359            trace_id: None,
1360            i18n_id: None,
1361            correlation_id: ctx.session_id.map(str::to_string),
1362            deadline_unix_ms: None,
1363            attempt: ctx.attempt,
1364            idempotency_key: ctx.session_id.map(str::to_string),
1365        },
1366        i18n_id: None,
1367        flow_id: ctx.flow_id.to_string(),
1368        node_id: Some(node_id.to_string()),
1369    }
1370}
1371
1372fn component_error(value: &Value) -> Option<(String, String)> {
1373    let obj = value.as_object()?;
1374    let ok = obj.get("ok").and_then(Value::as_bool)?;
1375    if ok {
1376        return None;
1377    }
1378    let err = obj.get("error")?.as_object()?;
1379    let code = err
1380        .get("code")
1381        .and_then(Value::as_str)
1382        .unwrap_or("component_error");
1383    let message = err
1384        .get("message")
1385        .and_then(Value::as_str)
1386        .unwrap_or("component reported error");
1387    Some((code.to_string(), message.to_string()))
1388}
1389
1390fn extract_wait_reason(payload: &Value) -> Option<String> {
1391    match payload {
1392        Value::String(s) => Some(s.clone()),
1393        Value::Object(map) => map
1394            .get("reason")
1395            .and_then(Value::as_str)
1396            .map(|value| value.to_string()),
1397        _ => None,
1398    }
1399}
1400
1401fn component_dispatch_outcome(output: NodeOutput) -> Result<DispatchOutcome> {
1402    if let Some(control) = parse_component_control(&output.payload)? {
1403        return Ok(match control {
1404            NodeControl::Jump(jump) => {
1405                let adjusted = NodeOutput::with_meta(jump.payload.clone(), jump.hints.clone());
1406                DispatchOutcome::with_control(adjusted, NodeControl::Jump(jump))
1407            }
1408            NodeControl::Respond {
1409                text,
1410                card_cbor,
1411                needs_user,
1412            } => DispatchOutcome::with_control(
1413                output,
1414                NodeControl::Respond {
1415                    text,
1416                    card_cbor,
1417                    needs_user,
1418                },
1419            ),
1420            other => DispatchOutcome::with_control(output, other),
1421        });
1422    }
1423    Ok(DispatchOutcome::complete(output))
1424}
1425
1426fn parse_component_control(payload: &Value) -> Result<Option<NodeControl>> {
1427    let Value::Object(map) = payload else {
1428        return Ok(None);
1429    };
1430    let Some(control_value) = map.get("greentic_control") else {
1431        return Ok(None);
1432    };
1433    let control = control_value
1434        .as_object()
1435        .ok_or_else(|| anyhow!("jump_failed: greentic_control must be an object"))?;
1436    let action = control
1437        .get("action")
1438        .and_then(Value::as_str)
1439        .ok_or_else(|| anyhow!("jump_failed: greentic_control.action is required"))?;
1440    let version = control
1441        .get("v")
1442        .and_then(Value::as_u64)
1443        .ok_or_else(|| anyhow!("jump_failed: greentic_control.v is required"))?;
1444    if version != 1 {
1445        bail!("jump_failed: unsupported greentic_control.v={version}");
1446    }
1447
1448    match action {
1449        "jump" => {
1450            let flow = control
1451                .get("flow")
1452                .and_then(Value::as_str)
1453                .map(str::trim)
1454                .filter(|value| !value.is_empty())
1455                .ok_or_else(|| anyhow!("jump_failed: jump flow is required"))?
1456                .to_string();
1457            let node = control
1458                .get("node")
1459                .and_then(Value::as_str)
1460                .map(str::trim)
1461                .filter(|value| !value.is_empty())
1462                .map(str::to_string);
1463            let payload = control.get("payload").cloned().unwrap_or(Value::Null);
1464            let hints = control.get("hints").cloned().unwrap_or(Value::Null);
1465            let max_redirects = control
1466                .get("max_redirects")
1467                .and_then(Value::as_u64)
1468                .and_then(|value| u32::try_from(value).ok());
1469            let reason = control
1470                .get("reason")
1471                .and_then(Value::as_str)
1472                .map(str::to_string);
1473            Ok(Some(NodeControl::Jump(JumpControl {
1474                flow,
1475                node,
1476                payload,
1477                hints,
1478                max_redirects,
1479                reason,
1480            })))
1481        }
1482        "respond" => {
1483            let text = control
1484                .get("text")
1485                .and_then(Value::as_str)
1486                .map(str::to_string);
1487            let card_cbor = control
1488                .get("card_cbor")
1489                .and_then(Value::as_array)
1490                .map(|bytes| {
1491                    bytes
1492                        .iter()
1493                        .filter_map(Value::as_u64)
1494                        .filter_map(|value| u8::try_from(value).ok())
1495                        .collect::<Vec<_>>()
1496                });
1497            let needs_user = control.get("needs_user").and_then(Value::as_bool);
1498            Ok(Some(NodeControl::Respond {
1499                text,
1500                card_cbor,
1501                needs_user,
1502            }))
1503        }
1504        _ => Ok(None),
1505    }
1506}
1507
1508fn template_context(state: &ExecutionState, prev: Value) -> Value {
1509    let entry = if state.entry.is_null() {
1510        Value::Object(JsonMap::new())
1511    } else {
1512        state.entry.clone()
1513    };
1514    let mut ctx = JsonMap::new();
1515    ctx.insert("entry".into(), entry.clone());
1516    ctx.insert("in".into(), entry); // alias for entry - used in flow templates
1517    ctx.insert("prev".into(), prev);
1518    ctx.insert("node".into(), Value::Object(state.outputs_map()));
1519    ctx.insert("state".into(), state.context());
1520    Value::Object(ctx)
1521}
1522
1523impl From<Flow> for HostFlow {
1524    fn from(value: Flow) -> Self {
1525        let mut nodes = IndexMap::new();
1526        for (id, node) in value.nodes {
1527            nodes.insert(id.clone(), HostNode::from(node));
1528        }
1529        let start = value
1530            .entrypoints
1531            .get("default")
1532            .and_then(Value::as_str)
1533            .and_then(|id| NodeId::from_str(id).ok())
1534            .or_else(|| nodes.keys().next().cloned());
1535        Self {
1536            id: value.id.as_str().to_string(),
1537            start,
1538            nodes,
1539        }
1540    }
1541}
1542
1543impl From<Node> for HostNode {
1544    fn from(node: Node) -> Self {
1545        let component_ref = node.component.id.as_str().to_string();
1546        let raw_operation = node.component.operation.clone();
1547        let operation_in_mapping = extract_operation_from_mapping(&node.input.mapping);
1548        let operation_is_component_exec = raw_operation.as_deref() == Some("component.exec");
1549        let operation_is_emit = raw_operation
1550            .as_deref()
1551            .map(|op| op.starts_with("emit."))
1552            .unwrap_or(false);
1553        let is_component_exec = component_ref == "component.exec" || operation_is_component_exec;
1554
1555        let kind = if is_component_exec {
1556            let target = if component_ref == "component.exec" {
1557                if let Some(op) = raw_operation
1558                    .as_deref()
1559                    .filter(|op| op.starts_with("emit."))
1560                {
1561                    op.to_string()
1562                } else {
1563                    extract_target_component(&node.input.mapping)
1564                        .unwrap_or_else(|| "component.exec".to_string())
1565                }
1566            } else {
1567                extract_target_component(&node.input.mapping)
1568                    .unwrap_or_else(|| component_ref.clone())
1569            };
1570            if target.starts_with("emit.") {
1571                NodeKind::BuiltinEmit {
1572                    kind: emit_kind_from_ref(&target),
1573                }
1574            } else {
1575                NodeKind::Exec {
1576                    target_component: target,
1577                }
1578            }
1579        } else if operation_is_emit {
1580            NodeKind::BuiltinEmit {
1581                kind: emit_kind_from_ref(raw_operation.as_deref().unwrap_or("emit.log")),
1582            }
1583        } else {
1584            match component_ref.as_str() {
1585                "flow.call" => NodeKind::FlowCall,
1586                "provider.invoke" => NodeKind::ProviderInvoke,
1587                "session.wait" => NodeKind::Wait,
1588                comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
1589                    kind: emit_kind_from_ref(comp),
1590                },
1591                other => NodeKind::PackComponent {
1592                    component_ref: other.to_string(),
1593                },
1594            }
1595        };
1596        let component_label = match &kind {
1597            NodeKind::Exec { .. } => "component.exec".to_string(),
1598            NodeKind::PackComponent { component_ref } => component_ref.clone(),
1599            NodeKind::ProviderInvoke => "provider.invoke".to_string(),
1600            NodeKind::FlowCall => "flow.call".to_string(),
1601            NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
1602            NodeKind::Wait => "session.wait".to_string(),
1603        };
1604        let operation_name = if is_component_exec && operation_is_component_exec {
1605            None
1606        } else {
1607            raw_operation.clone()
1608        };
1609        let payload_expr = match kind {
1610            NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.input.mapping),
1611            _ => node.input.mapping.clone(),
1612        };
1613        Self {
1614            kind,
1615            component: component_label,
1616            component_id: if is_component_exec {
1617                "component.exec".to_string()
1618            } else {
1619                component_ref
1620            },
1621            operation_name,
1622            operation_in_mapping,
1623            payload_expr,
1624            routing: node.routing,
1625        }
1626    }
1627}
1628
1629fn extract_target_component(payload: &Value) -> Option<String> {
1630    match payload {
1631        Value::Object(map) => map
1632            .get("component")
1633            .or_else(|| map.get("component_ref"))
1634            .and_then(Value::as_str)
1635            .map(|s| s.to_string()),
1636        _ => None,
1637    }
1638}
1639
1640fn extract_operation_from_mapping(payload: &Value) -> Option<String> {
1641    match payload {
1642        Value::Object(map) => map
1643            .get("operation")
1644            .or_else(|| map.get("op"))
1645            .and_then(Value::as_str)
1646            .map(str::trim)
1647            .filter(|value| !value.is_empty())
1648            .map(|value| value.to_string()),
1649        _ => None,
1650    }
1651}
1652
1653fn extract_emit_payload(payload: &Value) -> Value {
1654    if let Value::Object(map) = payload {
1655        if let Some(input) = map.get("input") {
1656            return input.clone();
1657        }
1658        if let Some(inner) = map.get("payload") {
1659            return inner.clone();
1660        }
1661    }
1662    payload.clone()
1663}
1664
1665fn split_operation_payload(payload: Value) -> (Value, Value) {
1666    if let Value::Object(mut map) = payload.clone()
1667        && map.contains_key("input")
1668    {
1669        let input = map.remove("input").unwrap_or(Value::Null);
1670        let config = map.remove("config").unwrap_or(Value::Null);
1671        let legacy_only = map.keys().all(|key| {
1672            matches!(
1673                key.as_str(),
1674                "operation" | "op" | "component" | "component_ref"
1675            )
1676        });
1677        if legacy_only {
1678            return (input, config);
1679        }
1680    }
1681    (payload, Value::Null)
1682}
1683
1684fn resolve_component_operation(
1685    node_id: &str,
1686    component_label: &str,
1687    payload_operation: Option<String>,
1688    operation_override: Option<&str>,
1689    operation_in_mapping: Option<&str>,
1690) -> Result<String> {
1691    if let Some(op) = operation_override
1692        .map(str::trim)
1693        .filter(|value| !value.is_empty())
1694    {
1695        return Ok(op.to_string());
1696    }
1697
1698    if let Some(op) = payload_operation
1699        .as_deref()
1700        .map(str::trim)
1701        .filter(|value| !value.is_empty())
1702    {
1703        return Ok(op.to_string());
1704    }
1705
1706    let mut message = format!(
1707        "missing operation for node `{}` (component `{}`); expected node.component.operation to be set",
1708        node_id, component_label,
1709    );
1710    if let Some(found) = operation_in_mapping {
1711        message.push_str(&format!(
1712            ". Found operation in input.mapping (`{}`) but this is not used; pack compiler must preserve node.component.operation.",
1713            found
1714        ));
1715    }
1716    bail!(message);
1717}
1718
1719fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
1720    match component_ref {
1721        "emit.log" => EmitKind::Log,
1722        "emit.response" => EmitKind::Response,
1723        other => EmitKind::Other(other.to_string()),
1724    }
1725}
1726
1727fn emit_ref_from_kind(kind: &EmitKind) -> String {
1728    match kind {
1729        EmitKind::Log => "emit.log".to_string(),
1730        EmitKind::Response => "emit.response".to_string(),
1731        EmitKind::Other(other) => other.clone(),
1732    }
1733}
1734
1735/// Returns `true` when `input` looks like an Adaptive Card invocation
1736/// (contains `card_source` or `card_spec` at the top level).
1737fn is_card_invocation(input: &Value) -> bool {
1738    if let Value::Object(map) = input {
1739        return map.contains_key("card_source") || map.contains_key("card_spec");
1740    }
1741    false
1742}
1743
1744/// Pre-resolve `card_source: "asset"` entries by reading the referenced JSON
1745/// file from the pack's assets directory and converting to
1746/// `card_source: "inline"` with `inline_json` populated.
1747///
1748/// This handles both top-level card fields and the nested `call.payload`
1749/// structure emitted by cards2pack.
1750fn resolve_card_assets(input: &mut Value, pack: &crate::pack::PackRuntime) {
1751    resolve_card_spec_asset(input, pack);
1752
1753    // Also resolve inside `call.payload` (cards2pack duplicates the card
1754    // invocation there).
1755    if let Value::Object(map) = input
1756        && let Some(Value::Object(call)) = map.get_mut("call")
1757        && let Some(payload) = call.get_mut("payload")
1758    {
1759        resolve_card_spec_asset(payload, pack);
1760    }
1761}
1762
1763/// Resolve a single card_spec asset_path → inline_json.
1764fn resolve_card_spec_asset(value: &mut Value, pack: &crate::pack::PackRuntime) {
1765    let Value::Object(map) = value else { return };
1766
1767    let is_asset = map
1768        .get("card_source")
1769        .and_then(Value::as_str)
1770        .map(|s| s.eq_ignore_ascii_case("asset"))
1771        .unwrap_or(false);
1772    if !is_asset {
1773        return;
1774    }
1775
1776    let asset_path = map
1777        .get("card_spec")
1778        .and_then(|spec| spec.get("asset_path"))
1779        .and_then(Value::as_str)
1780        .map(str::to_string);
1781
1782    let Some(asset_path) = asset_path else { return };
1783
1784    match pack.read_asset(&asset_path) {
1785        Ok(bytes) => {
1786            let card_json: Value = match serde_json::from_slice(&bytes) {
1787                Ok(v) => v,
1788                Err(err) => {
1789                    tracing::warn!(
1790                        asset_path,
1791                        %err,
1792                        "failed to parse card asset as JSON; leaving as asset reference"
1793                    );
1794                    return;
1795                }
1796            };
1797            tracing::debug!(asset_path, "pre-resolved card asset to inline_json");
1798            map.insert("card_source".into(), Value::String("inline".into()));
1799            if let Some(Value::Object(spec)) = map.get_mut("card_spec") {
1800                spec.insert("inline_json".into(), card_json);
1801                spec.remove("asset_path");
1802            }
1803        }
1804        Err(err) => {
1805            tracing::warn!(
1806                asset_path,
1807                %err,
1808                "card asset not found in pack; leaving as asset reference"
1809            );
1810        }
1811    }
1812}
1813
1814#[cfg(test)]
1815mod tests {
1816    use super::*;
1817    use crate::validate::{ValidationConfig, ValidationMode};
1818    use greentic_types::{
1819        Flow, FlowComponentRef, FlowId, FlowKind, InputMapping, Node, NodeId, OutputMapping,
1820        Routing, TelemetryHints,
1821    };
1822    use serde_json::json;
1823    use std::collections::BTreeMap;
1824    use std::str::FromStr;
1825    use std::sync::Mutex;
1826    use tokio::runtime::Runtime;
1827
1828    fn minimal_engine() -> FlowEngine {
1829        FlowEngine {
1830            packs: Vec::new(),
1831            flows: Vec::new(),
1832            flow_sources: HashMap::new(),
1833            flow_cache: RwLock::new(HashMap::new()),
1834            default_env: "local".to_string(),
1835            validation: ValidationConfig {
1836                mode: ValidationMode::Off,
1837            },
1838        }
1839    }
1840
1841    #[test]
1842    fn templating_renders_with_partials_and_data() {
1843        let mut state = ExecutionState::new(json!({ "city": "London" }));
1844        state.nodes.insert(
1845            "forecast".to_string(),
1846            NodeOutput::new(json!({ "temp": "20C" })),
1847        );
1848
1849        // templating context includes node outputs for runner-side payload rendering.
1850        let ctx = state.context();
1851        assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
1852    }
1853
1854    #[test]
1855    fn finalize_wraps_emitted_payloads() {
1856        let mut state = ExecutionState::new(json!({}));
1857        state.push_egress(json!({ "text": "first" }));
1858        state.push_egress(json!({ "text": "second" }));
1859        let result = state.finalize_with(Some(json!({ "text": "final" })));
1860        assert_eq!(
1861            result,
1862            json!([
1863                { "text": "first" },
1864                { "text": "second" },
1865                { "text": "final" }
1866            ])
1867        );
1868    }
1869
1870    #[test]
1871    fn finalize_flattens_final_array() {
1872        let mut state = ExecutionState::new(json!({}));
1873        state.push_egress(json!({ "text": "only" }));
1874        let result = state.finalize_with(Some(json!([
1875            { "text": "extra-1" },
1876            { "text": "extra-2" }
1877        ])));
1878        assert_eq!(
1879            result,
1880            json!([
1881                { "text": "only" },
1882                { "text": "extra-1" },
1883                { "text": "extra-2" }
1884            ])
1885        );
1886    }
1887
1888    #[test]
1889    fn parse_component_control_ignores_plain_payload() {
1890        let payload = json!({
1891            "flow": "not-a-control-field",
1892            "node": "n1"
1893        });
1894        let control = parse_component_control(&payload).expect("parse control");
1895        assert!(control.is_none());
1896    }
1897
1898    #[test]
1899    fn parse_component_control_parses_jump_marker() {
1900        let payload = json!({
1901            "greentic_control": {
1902                "action": "jump",
1903                "v": 1,
1904                "flow": "flow.b",
1905                "node": "node-2",
1906                "payload": { "message": "hi" },
1907                "hints": { "k": "v" },
1908                "max_redirects": 2,
1909                "reason": "handoff"
1910            }
1911        });
1912        let control = parse_component_control(&payload)
1913            .expect("parse control")
1914            .expect("missing control");
1915        match control {
1916            NodeControl::Jump(jump) => {
1917                assert_eq!(jump.flow, "flow.b");
1918                assert_eq!(jump.node.as_deref(), Some("node-2"));
1919                assert_eq!(jump.payload, json!({ "message": "hi" }));
1920                assert_eq!(jump.hints, json!({ "k": "v" }));
1921                assert_eq!(jump.max_redirects, Some(2));
1922                assert_eq!(jump.reason.as_deref(), Some("handoff"));
1923            }
1924            other => panic!("expected jump control, got {other:?}"),
1925        }
1926    }
1927
1928    #[test]
1929    fn parse_component_control_rejects_invalid_marker() {
1930        let payload = json!({
1931            "greentic_control": "bad-shape"
1932        });
1933        let err = parse_component_control(&payload).expect_err("expected invalid marker error");
1934        assert!(err.to_string().contains("greentic_control"));
1935    }
1936
1937    #[test]
1938    fn missing_operation_reports_node_and_component() {
1939        let engine = minimal_engine();
1940        let rt = Runtime::new().unwrap();
1941        let retry_config = RetryConfig {
1942            max_attempts: 1,
1943            base_delay_ms: 1,
1944        };
1945        let ctx = FlowContext {
1946            tenant: "tenant",
1947            pack_id: "test-pack",
1948            flow_id: "flow",
1949            node_id: Some("missing-op"),
1950            tool: None,
1951            action: None,
1952            session_id: None,
1953            provider_id: None,
1954            retry_config,
1955            attempt: 1,
1956            observer: None,
1957            mocks: None,
1958        };
1959        let node = HostNode {
1960            kind: NodeKind::Exec {
1961                target_component: "qa.process".into(),
1962            },
1963            component: "component.exec".into(),
1964            component_id: "component.exec".into(),
1965            operation_name: None,
1966            operation_in_mapping: None,
1967            payload_expr: Value::Null,
1968            routing: Routing::End,
1969        };
1970        let _state = ExecutionState::new(Value::Null);
1971        let payload = json!({ "component": "qa.process" });
1972        let event = NodeEvent {
1973            context: &ctx,
1974            node_id: "missing-op",
1975            node: &node,
1976            payload: &payload,
1977        };
1978        let err = rt
1979            .block_on(engine.execute_component_exec(
1980                &ctx,
1981                "missing-op",
1982                &node,
1983                payload.clone(),
1984                &event,
1985                ComponentOverrides {
1986                    component: None,
1987                    operation: None,
1988                },
1989            ))
1990            .unwrap_err();
1991        let message = err.to_string();
1992        assert!(
1993            message.contains("missing operation for node `missing-op`"),
1994            "unexpected message: {message}"
1995        );
1996        assert!(
1997            message.contains("(component `component.exec`)"),
1998            "unexpected message: {message}"
1999        );
2000    }
2001
2002    #[test]
2003    fn missing_operation_mentions_mapping_hint() {
2004        let engine = minimal_engine();
2005        let rt = Runtime::new().unwrap();
2006        let retry_config = RetryConfig {
2007            max_attempts: 1,
2008            base_delay_ms: 1,
2009        };
2010        let ctx = FlowContext {
2011            tenant: "tenant",
2012            pack_id: "test-pack",
2013            flow_id: "flow",
2014            node_id: Some("missing-op-hint"),
2015            tool: None,
2016            action: None,
2017            session_id: None,
2018            provider_id: None,
2019            retry_config,
2020            attempt: 1,
2021            observer: None,
2022            mocks: None,
2023        };
2024        let node = HostNode {
2025            kind: NodeKind::Exec {
2026                target_component: "qa.process".into(),
2027            },
2028            component: "component.exec".into(),
2029            component_id: "component.exec".into(),
2030            operation_name: None,
2031            operation_in_mapping: Some("render".into()),
2032            payload_expr: Value::Null,
2033            routing: Routing::End,
2034        };
2035        let _state = ExecutionState::new(Value::Null);
2036        let payload = json!({ "component": "qa.process" });
2037        let event = NodeEvent {
2038            context: &ctx,
2039            node_id: "missing-op-hint",
2040            node: &node,
2041            payload: &payload,
2042        };
2043        let err = rt
2044            .block_on(engine.execute_component_exec(
2045                &ctx,
2046                "missing-op-hint",
2047                &node,
2048                payload.clone(),
2049                &event,
2050                ComponentOverrides {
2051                    component: None,
2052                    operation: None,
2053                },
2054            ))
2055            .unwrap_err();
2056        let message = err.to_string();
2057        assert!(
2058            message.contains("missing operation for node `missing-op-hint`"),
2059            "unexpected message: {message}"
2060        );
2061        assert!(
2062            message.contains("Found operation in input.mapping (`render`)"),
2063            "unexpected message: {message}"
2064        );
2065    }
2066
2067    struct CountingObserver {
2068        starts: Mutex<Vec<String>>,
2069        ends: Mutex<Vec<Value>>,
2070    }
2071
2072    impl CountingObserver {
2073        fn new() -> Self {
2074            Self {
2075                starts: Mutex::new(Vec::new()),
2076                ends: Mutex::new(Vec::new()),
2077            }
2078        }
2079    }
2080
2081    impl ExecutionObserver for CountingObserver {
2082        fn on_node_start(&self, event: &NodeEvent<'_>) {
2083            self.starts.lock().unwrap().push(event.node_id.to_string());
2084        }
2085
2086        fn on_node_end(&self, _event: &NodeEvent<'_>, output: &Value) {
2087            self.ends.lock().unwrap().push(output.clone());
2088        }
2089
2090        fn on_node_error(&self, _event: &NodeEvent<'_>, _error: &dyn StdError) {}
2091    }
2092
2093    #[test]
2094    fn emits_end_event_for_successful_node() {
2095        let node_id = NodeId::from_str("emit").unwrap();
2096        let node = Node {
2097            id: node_id.clone(),
2098            component: FlowComponentRef {
2099                id: "emit.log".parse().unwrap(),
2100                pack_alias: None,
2101                operation: None,
2102            },
2103            input: InputMapping {
2104                mapping: json!({ "message": "logged" }),
2105            },
2106            output: OutputMapping {
2107                mapping: Value::Null,
2108            },
2109            routing: Routing::End,
2110            telemetry: TelemetryHints::default(),
2111        };
2112        let mut nodes = indexmap::IndexMap::default();
2113        nodes.insert(node_id.clone(), node);
2114        let flow = Flow {
2115            schema_version: "1.0".into(),
2116            id: FlowId::from_str("emit.flow").unwrap(),
2117            kind: FlowKind::Messaging,
2118            entrypoints: BTreeMap::from([(
2119                "default".to_string(),
2120                Value::String(node_id.to_string()),
2121            )]),
2122            nodes,
2123            metadata: Default::default(),
2124        };
2125        let host_flow = HostFlow::from(flow);
2126
2127        let engine = FlowEngine {
2128            packs: Vec::new(),
2129            flows: Vec::new(),
2130            flow_sources: HashMap::new(),
2131            flow_cache: RwLock::new(HashMap::from([(
2132                FlowKey {
2133                    pack_id: "test-pack".to_string(),
2134                    flow_id: "emit.flow".to_string(),
2135                },
2136                host_flow,
2137            )])),
2138            default_env: "local".to_string(),
2139            validation: ValidationConfig {
2140                mode: ValidationMode::Off,
2141            },
2142        };
2143        let observer = CountingObserver::new();
2144        let ctx = FlowContext {
2145            tenant: "demo",
2146            pack_id: "test-pack",
2147            flow_id: "emit.flow",
2148            node_id: None,
2149            tool: None,
2150            action: None,
2151            session_id: None,
2152            provider_id: None,
2153            retry_config: RetryConfig {
2154                max_attempts: 1,
2155                base_delay_ms: 1,
2156            },
2157            attempt: 1,
2158            observer: Some(&observer),
2159            mocks: None,
2160        };
2161
2162        let rt = Runtime::new().unwrap();
2163        let result = rt.block_on(engine.execute(ctx, Value::Null)).unwrap();
2164        assert!(matches!(result.status, FlowStatus::Completed));
2165
2166        let starts = observer.starts.lock().unwrap();
2167        let ends = observer.ends.lock().unwrap();
2168        assert_eq!(starts.len(), 1);
2169        assert_eq!(ends.len(), 1);
2170        assert_eq!(ends[0], json!({ "message": "logged" }));
2171    }
2172
2173    fn host_flow_for_test(
2174        flow_id: &str,
2175        node_ids: &[&str],
2176        default_start: Option<&str>,
2177    ) -> HostFlow {
2178        let mut nodes = indexmap::IndexMap::default();
2179        for node_id in node_ids {
2180            let id = NodeId::from_str(node_id).unwrap();
2181            let node = Node {
2182                id: id.clone(),
2183                component: FlowComponentRef {
2184                    id: "emit.log".parse().unwrap(),
2185                    pack_alias: None,
2186                    operation: None,
2187                },
2188                input: InputMapping {
2189                    mapping: json!({ "message": node_id }),
2190                },
2191                output: OutputMapping {
2192                    mapping: Value::Null,
2193                },
2194                routing: Routing::End,
2195                telemetry: TelemetryHints::default(),
2196            };
2197            nodes.insert(id, node);
2198        }
2199        let mut entrypoints = BTreeMap::new();
2200        if let Some(start) = default_start {
2201            entrypoints.insert("default".to_string(), Value::String(start.to_string()));
2202        }
2203        HostFlow::from(Flow {
2204            schema_version: "1.0".into(),
2205            id: FlowId::from_str(flow_id).unwrap(),
2206            kind: FlowKind::Messaging,
2207            entrypoints,
2208            nodes,
2209            metadata: Default::default(),
2210        })
2211    }
2212
2213    fn jump_test_engine() -> FlowEngine {
2214        let target_flow = host_flow_for_test("flow.target", &["node-a", "node-b"], None);
2215        FlowEngine {
2216            packs: Vec::new(),
2217            flows: Vec::new(),
2218            flow_sources: HashMap::new(),
2219            flow_cache: RwLock::new(HashMap::from([(
2220                FlowKey {
2221                    pack_id: "test-pack".to_string(),
2222                    flow_id: "flow.target".to_string(),
2223                },
2224                target_flow,
2225            )])),
2226            default_env: "local".to_string(),
2227            validation: ValidationConfig {
2228                mode: ValidationMode::Off,
2229            },
2230        }
2231    }
2232
2233    fn jump_ctx<'a>(flow_id: &'a str) -> FlowContext<'a> {
2234        FlowContext {
2235            tenant: "demo",
2236            pack_id: "test-pack",
2237            flow_id,
2238            node_id: None,
2239            tool: None,
2240            action: None,
2241            session_id: None,
2242            provider_id: None,
2243            retry_config: RetryConfig {
2244                max_attempts: 1,
2245                base_delay_ms: 1,
2246            },
2247            attempt: 1,
2248            observer: None,
2249            mocks: None,
2250        }
2251    }
2252
2253    #[test]
2254    fn apply_jump_unknown_flow_errors() {
2255        let engine = minimal_engine();
2256        let mut state = ExecutionState::new(Value::Null);
2257        let rt = Runtime::new().unwrap();
2258        let err = rt
2259            .block_on(engine.apply_jump(
2260                &jump_ctx("flow.source"),
2261                &mut state,
2262                JumpControl {
2263                    flow: "flow.missing".into(),
2264                    node: None,
2265                    payload: json!({ "ok": true }),
2266                    hints: Value::Null,
2267                    max_redirects: None,
2268                    reason: None,
2269                },
2270            ))
2271            .unwrap_err();
2272        assert!(
2273            err.to_string().contains("unknown_flow"),
2274            "unexpected error: {err}"
2275        );
2276    }
2277
2278    #[test]
2279    fn apply_jump_unknown_node_errors() {
2280        let engine = jump_test_engine();
2281        let mut state = ExecutionState::new(Value::Null);
2282        let rt = Runtime::new().unwrap();
2283        let err = rt
2284            .block_on(engine.apply_jump(
2285                &jump_ctx("flow.source"),
2286                &mut state,
2287                JumpControl {
2288                    flow: "flow.target".into(),
2289                    node: Some("node-missing".into()),
2290                    payload: json!({ "ok": true }),
2291                    hints: Value::Null,
2292                    max_redirects: None,
2293                    reason: None,
2294                },
2295            ))
2296            .unwrap_err();
2297        assert!(
2298            err.to_string().contains("unknown_node"),
2299            "unexpected error: {err}"
2300        );
2301    }
2302
2303    #[test]
2304    fn apply_jump_uses_default_start_fallback() {
2305        let engine = jump_test_engine();
2306        let mut state = ExecutionState::new(Value::Null);
2307        let rt = Runtime::new().unwrap();
2308        let target = rt
2309            .block_on(engine.apply_jump(
2310                &jump_ctx("flow.source"),
2311                &mut state,
2312                JumpControl {
2313                    flow: "flow.target".into(),
2314                    node: None,
2315                    payload: json!({ "k": "v" }),
2316                    hints: Value::Null,
2317                    max_redirects: None,
2318                    reason: None,
2319                },
2320            ))
2321            .expect("jump target");
2322        assert_eq!(target.flow_id, "flow.target");
2323        assert_eq!(target.node_id.as_str(), "node-a");
2324    }
2325
2326    #[test]
2327    fn apply_jump_redirect_limit_enforced() {
2328        let engine = jump_test_engine();
2329        let mut state = ExecutionState::new(Value::Null);
2330        state.redirect_count = 3;
2331        let rt = Runtime::new().unwrap();
2332        let err = rt
2333            .block_on(engine.apply_jump(
2334                &jump_ctx("flow.source"),
2335                &mut state,
2336                JumpControl {
2337                    flow: "flow.target".into(),
2338                    node: None,
2339                    payload: json!({ "k": "v" }),
2340                    hints: Value::Null,
2341                    max_redirects: Some(3),
2342                    reason: None,
2343                },
2344            ))
2345            .unwrap_err();
2346        assert_eq!(err.to_string(), "redirect_limit");
2347    }
2348}
2349
2350use tracing::Instrument;
2351
2352pub struct FlowContext<'a> {
2353    pub tenant: &'a str,
2354    pub pack_id: &'a str,
2355    pub flow_id: &'a str,
2356    pub node_id: Option<&'a str>,
2357    pub tool: Option<&'a str>,
2358    pub action: Option<&'a str>,
2359    pub session_id: Option<&'a str>,
2360    pub provider_id: Option<&'a str>,
2361    pub retry_config: RetryConfig,
2362    pub attempt: u32,
2363    pub observer: Option<&'a dyn ExecutionObserver>,
2364    pub mocks: Option<&'a MockLayer>,
2365}
2366
2367#[derive(Copy, Clone)]
2368pub struct RetryConfig {
2369    pub max_attempts: u32,
2370    pub base_delay_ms: u64,
2371}
2372
2373fn should_retry(err: &anyhow::Error) -> bool {
2374    let lower = err.to_string().to_lowercase();
2375    lower.contains("transient")
2376        || lower.contains("unavailable")
2377        || lower.contains("internal")
2378        || lower.contains("timeout")
2379}
2380
2381impl From<FlowRetryConfig> for RetryConfig {
2382    fn from(value: FlowRetryConfig) -> Self {
2383        Self {
2384            max_attempts: value.max_attempts.max(1),
2385            base_delay_ms: value.base_delay_ms.max(50),
2386        }
2387    }
2388}