greentic_runner_host/runner/
engine.rs

1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
9use anyhow::{Context, Result, anyhow, bail};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use crate::config::{FlowRetryConfig, HostConfig};
18use crate::pack::{FlowDescriptor, PackRuntime};
19use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
20use greentic_types::{Flow, Node, NodeId, Routing};
21
22pub struct FlowEngine {
23    packs: Vec<Arc<PackRuntime>>,
24    flows: Vec<FlowDescriptor>,
25    flow_sources: HashMap<String, usize>,
26    flow_cache: RwLock<HashMap<String, HostFlow>>,
27    default_env: String,
28}
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
31pub struct FlowSnapshot {
32    pub flow_id: String,
33    pub next_node: String,
34    pub state: ExecutionState,
35}
36
37#[derive(Clone, Debug)]
38pub struct FlowWait {
39    pub reason: Option<String>,
40    pub snapshot: FlowSnapshot,
41}
42
43#[derive(Clone, Debug)]
44pub enum FlowStatus {
45    Completed,
46    Waiting(FlowWait),
47}
48
49#[derive(Clone, Debug)]
50pub struct FlowExecution {
51    pub output: Value,
52    pub status: FlowStatus,
53}
54
55#[derive(Clone, Debug)]
56struct HostFlow {
57    id: String,
58    start: Option<NodeId>,
59    nodes: IndexMap<NodeId, HostNode>,
60}
61
62#[derive(Clone, Debug)]
63pub struct HostNode {
64    kind: NodeKind,
65    /// Backwards-compatible component label for observers/transcript.
66    pub component: String,
67    operation_in_mapping: Option<String>,
68    payload_expr: Value,
69    routing: Routing,
70}
71
72#[derive(Clone, Debug)]
73enum NodeKind {
74    Exec { target_component: String },
75    PackComponent { component_ref: String },
76    ProviderInvoke,
77    FlowCall,
78    BuiltinEmit { kind: EmitKind },
79    Wait,
80}
81
82#[derive(Clone, Debug)]
83enum EmitKind {
84    Log,
85    Response,
86    Other(String),
87}
88
89impl FlowExecution {
90    fn completed(output: Value) -> Self {
91        Self {
92            output,
93            status: FlowStatus::Completed,
94        }
95    }
96
97    fn waiting(output: Value, wait: FlowWait) -> Self {
98        Self {
99            output,
100            status: FlowStatus::Waiting(wait),
101        }
102    }
103}
104
105impl FlowEngine {
106    pub async fn new(packs: Vec<Arc<PackRuntime>>, _config: Arc<HostConfig>) -> Result<Self> {
107        let mut flow_sources = HashMap::new();
108        let mut descriptors = Vec::new();
109        for (idx, pack) in packs.iter().enumerate() {
110            let flows = pack.list_flows().await?;
111            for flow in flows {
112                tracing::info!(
113                    flow_id = %flow.id,
114                    flow_type = %flow.flow_type,
115                    pack_index = idx,
116                    "registered flow"
117                );
118                flow_sources.insert(flow.id.clone(), idx);
119                descriptors.retain(|existing: &FlowDescriptor| existing.id != flow.id);
120                descriptors.push(flow);
121            }
122        }
123
124        let mut flow_map = HashMap::new();
125        for flow in &descriptors {
126            if let Some(&pack_idx) = flow_sources.get(&flow.id) {
127                let pack_clone = Arc::clone(&packs[pack_idx]);
128                let flow_id = flow.id.clone();
129                let task_flow_id = flow_id.clone();
130                match task::spawn_blocking(move || pack_clone.load_flow(&task_flow_id)).await {
131                    Ok(Ok(flow)) => {
132                        flow_map.insert(flow_id, HostFlow::from(flow));
133                    }
134                    Ok(Err(err)) => {
135                        tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
136                    }
137                    Err(err) => {
138                        tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
139                    }
140                }
141            }
142        }
143
144        Ok(Self {
145            packs,
146            flows: descriptors,
147            flow_sources,
148            flow_cache: RwLock::new(flow_map),
149            default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
150        })
151    }
152
153    async fn get_or_load_flow(&self, flow_id: &str) -> Result<HostFlow> {
154        if let Some(flow) = self.flow_cache.read().get(flow_id).cloned() {
155            return Ok(flow);
156        }
157
158        let pack_idx = *self
159            .flow_sources
160            .get(flow_id)
161            .with_context(|| format!("flow {flow_id} not registered"))?;
162        let pack = Arc::clone(&self.packs[pack_idx]);
163        let flow_id_owned = flow_id.to_string();
164        let task_flow_id = flow_id_owned.clone();
165        let flow = task::spawn_blocking(move || pack.load_flow(&task_flow_id))
166            .await
167            .context("failed to join flow metadata task")??;
168        let host_flow = HostFlow::from(flow);
169        self.flow_cache
170            .write()
171            .insert(flow_id_owned.clone(), host_flow.clone());
172        Ok(host_flow)
173    }
174
175    pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
176        let span = tracing::info_span!(
177            "flow.execute",
178            tenant = tracing::field::Empty,
179            flow_id = tracing::field::Empty,
180            node_id = tracing::field::Empty,
181            tool = tracing::field::Empty,
182            action = tracing::field::Empty
183        );
184        annotate_span(
185            &span,
186            &FlowSpanAttributes {
187                tenant: ctx.tenant,
188                flow_id: ctx.flow_id,
189                node_id: ctx.node_id,
190                tool: ctx.tool,
191                action: ctx.action,
192            },
193        );
194        set_flow_context(
195            &self.default_env,
196            ctx.tenant,
197            ctx.flow_id,
198            ctx.node_id,
199            ctx.provider_id,
200            ctx.session_id,
201        );
202        let retry_config = ctx.retry_config;
203        let original_input = input;
204        async move {
205            let mut attempt = 0u32;
206            loop {
207                attempt += 1;
208                match self.execute_once(&ctx, original_input.clone()).await {
209                    Ok(value) => return Ok(value),
210                    Err(err) => {
211                        if attempt >= retry_config.max_attempts || !should_retry(&err) {
212                            return Err(err);
213                        }
214                        let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
215                        tracing::warn!(
216                            tenant = ctx.tenant,
217                            flow_id = ctx.flow_id,
218                            attempt,
219                            max_attempts = retry_config.max_attempts,
220                            delay_ms = delay,
221                            error = %err,
222                            "transient flow execution failure, backing off"
223                        );
224                        tokio::time::sleep(Duration::from_millis(delay)).await;
225                    }
226                }
227            }
228        }
229        .instrument(span)
230        .await
231    }
232
233    pub async fn resume(
234        &self,
235        ctx: FlowContext<'_>,
236        snapshot: FlowSnapshot,
237        input: Value,
238    ) -> Result<FlowExecution> {
239        if snapshot.flow_id != ctx.flow_id {
240            bail!(
241                "snapshot flow {} does not match requested {}",
242                snapshot.flow_id,
243                ctx.flow_id
244            );
245        }
246        let flow_ir = self.get_or_load_flow(ctx.flow_id).await?;
247        let mut state = snapshot.state;
248        state.replace_input(input);
249        self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
250            .await
251    }
252
253    async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
254        let flow_ir = self.get_or_load_flow(ctx.flow_id).await?;
255        let state = ExecutionState::new(input);
256        self.drive_flow(ctx, flow_ir, state, None).await
257    }
258
259    async fn drive_flow(
260        &self,
261        ctx: &FlowContext<'_>,
262        flow_ir: HostFlow,
263        mut state: ExecutionState,
264        resume_from: Option<String>,
265    ) -> Result<FlowExecution> {
266        let mut current = match resume_from {
267            Some(node) => NodeId::from_str(&node)
268                .with_context(|| format!("invalid resume node id `{node}`"))?,
269            None => flow_ir
270                .start
271                .clone()
272                .or_else(|| flow_ir.nodes.keys().next().cloned())
273                .with_context(|| format!("flow {} has no start node", flow_ir.id))?,
274        };
275
276        loop {
277            let node = flow_ir
278                .nodes
279                .get(&current)
280                .with_context(|| format!("node {} not found", current.as_str()))?;
281
282            let payload = node.payload_expr.clone();
283            let observed_payload = payload.clone();
284            let node_id = current.clone();
285            let event = NodeEvent {
286                context: ctx,
287                node_id: node_id.as_str(),
288                node,
289                payload: &observed_payload,
290            };
291            if let Some(observer) = ctx.observer {
292                observer.on_node_start(&event);
293            }
294            let DispatchOutcome {
295                output,
296                wait_reason,
297            } = self
298                .dispatch_node(ctx, node_id.as_str(), node, &mut state, payload)
299                .await?;
300
301            state.nodes.insert(node_id.clone().into(), output.clone());
302
303            let (next, should_exit) = match &node.routing {
304                Routing::Next { node_id } => (Some(node_id.clone()), false),
305                Routing::End | Routing::Reply => (None, true),
306                Routing::Branch { default, .. } => (default.clone(), default.is_none()),
307                Routing::Custom(raw) => {
308                    tracing::warn!(
309                        flow_id = %flow_ir.id,
310                        node_id = %node_id,
311                        routing = ?raw,
312                        "unsupported routing; terminating flow"
313                    );
314                    (None, true)
315                }
316            };
317
318            if let Some(wait_reason) = wait_reason {
319                let resume_target = next.clone().ok_or_else(|| {
320                    anyhow!(
321                        "session.wait node {} requires a non-empty route",
322                        current.as_str()
323                    )
324                })?;
325                let mut snapshot_state = state.clone();
326                snapshot_state.clear_egress();
327                let snapshot = FlowSnapshot {
328                    flow_id: ctx.flow_id.to_string(),
329                    next_node: resume_target.as_str().to_string(),
330                    state: snapshot_state,
331                };
332                let output_value = state.clone().finalize_with(None);
333                return Ok(FlowExecution::waiting(
334                    output_value,
335                    FlowWait {
336                        reason: Some(wait_reason),
337                        snapshot,
338                    },
339                ));
340            }
341
342            if should_exit {
343                return Ok(FlowExecution::completed(
344                    state.finalize_with(Some(output.payload.clone())),
345                ));
346            }
347
348            match next {
349                Some(n) => current = n,
350                None => {
351                    return Ok(FlowExecution::completed(
352                        state.finalize_with(Some(output.payload.clone())),
353                    ));
354                }
355            }
356        }
357    }
358
359    async fn dispatch_node(
360        &self,
361        ctx: &FlowContext<'_>,
362        node_id: &str,
363        node: &HostNode,
364        state: &mut ExecutionState,
365        payload: Value,
366    ) -> Result<DispatchOutcome> {
367        match &node.kind {
368            NodeKind::Exec { target_component } => self
369                .execute_component_exec(
370                    ctx,
371                    node_id,
372                    node,
373                    state,
374                    payload,
375                    Some(target_component.as_str()),
376                )
377                .await
378                .map(DispatchOutcome::complete),
379            NodeKind::PackComponent { component_ref } => self
380                .execute_component_exec(
381                    ctx,
382                    node_id,
383                    node,
384                    state,
385                    payload,
386                    Some(component_ref.as_str()),
387                )
388                .await
389                .map(DispatchOutcome::complete),
390            NodeKind::FlowCall => self
391                .execute_flow_call(ctx, payload)
392                .await
393                .map(DispatchOutcome::complete),
394            NodeKind::ProviderInvoke => self
395                .execute_provider_invoke(ctx, node_id, state, payload)
396                .await
397                .map(DispatchOutcome::complete),
398            NodeKind::BuiltinEmit { kind } => {
399                match kind {
400                    EmitKind::Log | EmitKind::Response => {}
401                    EmitKind::Other(component) => {
402                        tracing::debug!(%component, "handling emit.* as builtin");
403                    }
404                }
405                state.push_egress(payload.clone());
406                Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
407            }
408            NodeKind::Wait => {
409                let reason = extract_wait_reason(&payload);
410                Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
411            }
412        }
413    }
414
415    async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
416        #[derive(Deserialize)]
417        struct FlowCallPayload {
418            #[serde(alias = "flow")]
419            flow_id: String,
420            #[serde(default)]
421            input: Value,
422        }
423
424        let call: FlowCallPayload =
425            serde_json::from_value(payload).context("invalid payload for flow.call node")?;
426        if call.flow_id.trim().is_empty() {
427            bail!("flow.call requires a non-empty flow_id");
428        }
429
430        let sub_input = if call.input.is_null() {
431            Value::Null
432        } else {
433            call.input
434        };
435
436        let flow_id_owned = call.flow_id;
437        let action = "flow.call";
438        let sub_ctx = FlowContext {
439            tenant: ctx.tenant,
440            flow_id: flow_id_owned.as_str(),
441            node_id: None,
442            tool: ctx.tool,
443            action: Some(action),
444            session_id: ctx.session_id,
445            provider_id: ctx.provider_id,
446            retry_config: ctx.retry_config,
447            observer: ctx.observer,
448            mocks: ctx.mocks,
449        };
450
451        let execution = Box::pin(self.execute(sub_ctx, sub_input))
452            .await
453            .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
454        match execution.status {
455            FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
456            FlowStatus::Waiting(wait) => bail!(
457                "flow.call cannot pause (flow {} waiting {:?})",
458                flow_id_owned,
459                wait.reason
460            ),
461        }
462    }
463
464    async fn execute_component_exec(
465        &self,
466        ctx: &FlowContext<'_>,
467        node_id: &str,
468        node: &HostNode,
469        state: &ExecutionState,
470        payload: Value,
471        component_override: Option<&str>,
472    ) -> Result<NodeOutput> {
473        #[derive(Deserialize)]
474        struct ComponentPayload {
475            #[serde(default, alias = "component_ref", alias = "component")]
476            component: Option<String>,
477            #[serde(alias = "op")]
478            operation: Option<String>,
479            #[serde(default)]
480            input: Value,
481            #[serde(default)]
482            config: Value,
483        }
484
485        let payload: ComponentPayload =
486            serde_json::from_value(payload).context("invalid payload for component.exec")?;
487        let component_ref = component_override
488            .map(str::to_string)
489            .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
490            .with_context(|| "component.exec requires a component_ref")?;
491        let component_label = node.component.as_str();
492        let operation = match payload.operation.filter(|v| !v.trim().is_empty()) {
493            Some(op) => op,
494            None => {
495                let mut message = format!(
496                    "missing operation for node `{}` (component `{}`); expected node.component.operation to be set",
497                    node_id, component_label,
498                );
499                if let Some(found) = &node.operation_in_mapping {
500                    message.push_str(&format!(
501                        ". Found operation in input.mapping (`{}`) but this is not used; pack compiler must preserve node.component.operation.",
502                        found
503                    ));
504                }
505                bail!(message);
506            }
507        };
508        let mut input = payload.input;
509        if let Value::Object(mut map) = input {
510            map.entry("state".to_string())
511                .or_insert_with(|| state.context());
512            input = Value::Object(map);
513        }
514        let input_json = serde_json::to_string(&input)?;
515        let config_json = if payload.config.is_null() {
516            None
517        } else {
518            Some(serde_json::to_string(&payload.config)?)
519        };
520
521        let pack_idx = *self
522            .flow_sources
523            .get(ctx.flow_id)
524            .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
525        let pack = Arc::clone(&self.packs[pack_idx]);
526        let exec_ctx = component_exec_ctx(ctx, node_id);
527        let value = pack
528            .invoke_component(
529                &component_ref,
530                exec_ctx,
531                &operation,
532                config_json,
533                input_json,
534            )
535            .await?;
536
537        Ok(NodeOutput::new(value))
538    }
539
540    async fn execute_provider_invoke(
541        &self,
542        ctx: &FlowContext<'_>,
543        node_id: &str,
544        state: &ExecutionState,
545        payload: Value,
546    ) -> Result<NodeOutput> {
547        #[derive(Deserialize)]
548        struct ProviderPayload {
549            #[serde(default)]
550            provider_id: Option<String>,
551            #[serde(default)]
552            provider_type: Option<String>,
553            #[serde(default, alias = "operation")]
554            op: Option<String>,
555            #[serde(default)]
556            input: Value,
557            #[serde(default)]
558            in_map: Value,
559            #[serde(default)]
560            out_map: Value,
561            #[serde(default)]
562            err_map: Value,
563        }
564
565        let payload: ProviderPayload =
566            serde_json::from_value(payload).context("invalid payload for provider.invoke")?;
567        let op = payload
568            .op
569            .as_deref()
570            .filter(|v| !v.trim().is_empty())
571            .with_context(|| "provider.invoke requires an op")?
572            .to_string();
573
574        let mut input_value = if !payload.in_map.is_null() {
575            let ctx_value = mapping_ctx(payload.input.clone(), state);
576            apply_mapping(&payload.in_map, &ctx_value)
577                .context("failed to evaluate provider.invoke in_map")?
578        } else if !payload.input.is_null() {
579            payload.input
580        } else {
581            Value::Null
582        };
583
584        if let Value::Object(ref mut map) = input_value {
585            map.entry("state".to_string())
586                .or_insert_with(|| state.context());
587        }
588        let input_json = serde_json::to_vec(&input_value)?;
589
590        let pack_idx = *self
591            .flow_sources
592            .get(ctx.flow_id)
593            .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
594        let pack = Arc::clone(&self.packs[pack_idx]);
595        let binding = pack.resolve_provider(
596            payload.provider_id.as_deref(),
597            payload.provider_type.as_deref(),
598        )?;
599        let exec_ctx = component_exec_ctx(ctx, node_id);
600        let result = pack
601            .invoke_provider(&binding, exec_ctx, &op, input_json)
602            .await?;
603
604        let output = if payload.out_map.is_null() {
605            result
606        } else {
607            let ctx_value = mapping_ctx(result, state);
608            apply_mapping(&payload.out_map, &ctx_value)
609                .context("failed to evaluate provider.invoke out_map")?
610        };
611        let _ = payload.err_map;
612        Ok(NodeOutput::new(output))
613    }
614
615    pub fn flows(&self) -> &[FlowDescriptor] {
616        &self.flows
617    }
618
619    pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
620        self.flows
621            .iter()
622            .find(|descriptor| descriptor.flow_type == flow_type)
623    }
624
625    pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
626        self.flows
627            .iter()
628            .find(|descriptor| descriptor.id == flow_id)
629    }
630}
631
632pub trait ExecutionObserver: Send + Sync {
633    fn on_node_start(&self, event: &NodeEvent<'_>);
634    fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
635    fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
636}
637
638pub struct NodeEvent<'a> {
639    pub context: &'a FlowContext<'a>,
640    pub node_id: &'a str,
641    pub node: &'a HostNode,
642    pub payload: &'a Value,
643}
644
645#[derive(Clone, Debug, Serialize, Deserialize)]
646pub struct ExecutionState {
647    input: Value,
648    nodes: HashMap<String, NodeOutput>,
649    egress: Vec<Value>,
650}
651
652impl ExecutionState {
653    fn new(input: Value) -> Self {
654        Self {
655            input,
656            nodes: HashMap::new(),
657            egress: Vec::new(),
658        }
659    }
660
661    fn context(&self) -> Value {
662        let mut nodes = JsonMap::new();
663        for (id, output) in &self.nodes {
664            nodes.insert(
665                id.clone(),
666                json!({
667                    "ok": output.ok,
668                    "payload": output.payload.clone(),
669                    "meta": output.meta.clone(),
670                }),
671            );
672        }
673        json!({
674            "input": self.input.clone(),
675            "nodes": nodes,
676        })
677    }
678    fn push_egress(&mut self, payload: Value) {
679        self.egress.push(payload);
680    }
681
682    fn replace_input(&mut self, input: Value) {
683        self.input = input;
684    }
685
686    fn clear_egress(&mut self) {
687        self.egress.clear();
688    }
689
690    fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
691        if self.egress.is_empty() {
692            return final_payload.unwrap_or(Value::Null);
693        }
694        let mut emitted = std::mem::take(&mut self.egress);
695        if let Some(value) = final_payload {
696            match value {
697                Value::Null => {}
698                Value::Array(items) => emitted.extend(items),
699                other => emitted.push(other),
700            }
701        }
702        Value::Array(emitted)
703    }
704}
705
706#[derive(Clone, Debug, Serialize, Deserialize)]
707struct NodeOutput {
708    ok: bool,
709    payload: Value,
710    meta: Value,
711}
712
713impl NodeOutput {
714    fn new(payload: Value) -> Self {
715        Self {
716            ok: true,
717            payload,
718            meta: Value::Null,
719        }
720    }
721}
722
723struct DispatchOutcome {
724    output: NodeOutput,
725    wait_reason: Option<String>,
726}
727
728impl DispatchOutcome {
729    fn complete(output: NodeOutput) -> Self {
730        Self {
731            output,
732            wait_reason: None,
733        }
734    }
735
736    fn wait(output: NodeOutput, reason: Option<String>) -> Self {
737        Self {
738            output,
739            wait_reason: reason,
740        }
741    }
742}
743
744fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
745    ComponentExecCtx {
746        tenant: ComponentTenantCtx {
747            tenant: ctx.tenant.to_string(),
748            team: None,
749            user: ctx.provider_id.map(str::to_string),
750            trace_id: None,
751            correlation_id: ctx.session_id.map(str::to_string),
752            deadline_unix_ms: None,
753            attempt: 1,
754            idempotency_key: ctx.session_id.map(str::to_string),
755        },
756        flow_id: ctx.flow_id.to_string(),
757        node_id: Some(node_id.to_string()),
758    }
759}
760
761fn extract_wait_reason(payload: &Value) -> Option<String> {
762    match payload {
763        Value::String(s) => Some(s.clone()),
764        Value::Object(map) => map
765            .get("reason")
766            .and_then(Value::as_str)
767            .map(|value| value.to_string()),
768        _ => None,
769    }
770}
771
772fn mapping_ctx(root: Value, state: &ExecutionState) -> Value {
773    json!({
774        "input": root.clone(),
775        "result": root,
776        "state": state.context(),
777    })
778}
779
780fn apply_mapping(template: &Value, ctx: &Value) -> Result<Value> {
781    match template {
782        Value::String(path) if path.starts_with('/') => ctx
783            .pointer(path)
784            .cloned()
785            .ok_or_else(|| anyhow!("mapping path `{path}` not found")),
786        Value::Array(items) => {
787            let mut mapped = Vec::with_capacity(items.len());
788            for item in items {
789                mapped.push(apply_mapping(item, ctx)?);
790            }
791            Ok(Value::Array(mapped))
792        }
793        Value::Object(map) => {
794            let mut out = serde_json::Map::new();
795            for (key, value) in map {
796                out.insert(key.clone(), apply_mapping(value, ctx)?);
797            }
798            Ok(Value::Object(out))
799        }
800        other => Ok(other.clone()),
801    }
802}
803
804impl From<Flow> for HostFlow {
805    fn from(value: Flow) -> Self {
806        let mut nodes = IndexMap::new();
807        for (id, node) in value.nodes {
808            nodes.insert(id.clone(), HostNode::from(node));
809        }
810        let start = value
811            .entrypoints
812            .get("default")
813            .and_then(Value::as_str)
814            .and_then(|id| NodeId::from_str(id).ok())
815            .or_else(|| nodes.keys().next().cloned());
816        Self {
817            id: value.id.as_str().to_string(),
818            start,
819            nodes,
820        }
821    }
822}
823
824impl From<Node> for HostNode {
825    fn from(node: Node) -> Self {
826        let component_ref = node.component.id.as_str().to_string();
827        let operation_in_mapping = extract_operation_from_mapping(&node.input.mapping);
828        let kind = match component_ref.as_str() {
829            "component.exec" => {
830                let target = extract_target_component(&node.input.mapping)
831                    .unwrap_or_else(|| "component.exec".to_string());
832                if target.starts_with("emit.") {
833                    NodeKind::BuiltinEmit {
834                        kind: emit_kind_from_ref(&target),
835                    }
836                } else {
837                    NodeKind::Exec {
838                        target_component: target,
839                    }
840                }
841            }
842            "flow.call" => NodeKind::FlowCall,
843            "provider.invoke" => NodeKind::ProviderInvoke,
844            "session.wait" => NodeKind::Wait,
845            comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
846                kind: emit_kind_from_ref(comp),
847            },
848            other => NodeKind::PackComponent {
849                component_ref: other.to_string(),
850            },
851        };
852        let component_label = match &kind {
853            NodeKind::Exec { .. } => "component.exec".to_string(),
854            NodeKind::PackComponent { component_ref } => component_ref.clone(),
855            NodeKind::ProviderInvoke => "provider.invoke".to_string(),
856            NodeKind::FlowCall => "flow.call".to_string(),
857            NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
858            NodeKind::Wait => "session.wait".to_string(),
859        };
860        let payload_expr = match kind {
861            NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.input.mapping),
862            _ => node.input.mapping.clone(),
863        };
864        let payload_expr = match &kind {
865            // Packs may encode the component operation on the FlowComponentRef instead of in the
866            // payload. Preserve it so component.exec sees the bound operation.
867            NodeKind::Exec { .. } | NodeKind::PackComponent { .. } => {
868                merge_operation(payload_expr, node.component.operation.as_deref())
869            }
870            _ => payload_expr,
871        };
872        Self {
873            kind,
874            component: component_label,
875            operation_in_mapping,
876            payload_expr,
877            routing: node.routing,
878        }
879    }
880}
881
882fn extract_target_component(payload: &Value) -> Option<String> {
883    match payload {
884        Value::Object(map) => map
885            .get("component")
886            .or_else(|| map.get("component_ref"))
887            .and_then(Value::as_str)
888            .map(|s| s.to_string()),
889        _ => None,
890    }
891}
892
893fn extract_operation_from_mapping(payload: &Value) -> Option<String> {
894    match payload {
895        Value::Object(map) => map
896            .get("operation")
897            .or_else(|| map.get("op"))
898            .and_then(Value::as_str)
899            .map(str::trim)
900            .filter(|value| !value.is_empty())
901            .map(|value| value.to_string()),
902        _ => None,
903    }
904}
905
906fn extract_emit_payload(payload: &Value) -> Value {
907    if let Value::Object(map) = payload {
908        if let Some(input) = map.get("input") {
909            return input.clone();
910        }
911        if let Some(inner) = map.get("payload") {
912            return inner.clone();
913        }
914    }
915    payload.clone()
916}
917
918fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
919    match component_ref {
920        "emit.log" => EmitKind::Log,
921        "emit.response" => EmitKind::Response,
922        other => EmitKind::Other(other.to_string()),
923    }
924}
925
926fn emit_ref_from_kind(kind: &EmitKind) -> String {
927    match kind {
928        EmitKind::Log => "emit.log".to_string(),
929        EmitKind::Response => "emit.response".to_string(),
930        EmitKind::Other(other) => other.clone(),
931    }
932}
933
934fn merge_operation(payload: Value, operation: Option<&str>) -> Value {
935    let Some(op) = operation else { return payload };
936    match payload {
937        Value::Object(mut map) => {
938            let set_op = match map.get("operation") {
939                Some(Value::String(existing)) => existing.trim().is_empty(),
940                None => true,
941                _ => true,
942            };
943            if set_op {
944                map.insert("operation".into(), Value::String(op.to_string()));
945            }
946            Value::Object(map)
947        }
948        Value::Null => {
949            let mut map = JsonMap::new();
950            map.insert("operation".into(), Value::String(op.to_string()));
951            Value::Object(map)
952        }
953        other => other,
954    }
955}
956
957#[cfg(test)]
958mod tests {
959    use super::*;
960    use serde_json::json;
961    use tokio::runtime::Runtime;
962
963    fn minimal_engine() -> FlowEngine {
964        FlowEngine {
965            packs: Vec::new(),
966            flows: Vec::new(),
967            flow_sources: HashMap::new(),
968            flow_cache: RwLock::new(HashMap::new()),
969            default_env: "local".to_string(),
970        }
971    }
972
973    #[test]
974    fn templating_renders_with_partials_and_data() {
975        let mut state = ExecutionState::new(json!({ "city": "London" }));
976        state.nodes.insert(
977            "forecast".to_string(),
978            NodeOutput::new(json!({ "temp": "20C" })),
979        );
980
981        // templating handled via component now; ensure context still includes node outputs
982        let ctx = state.context();
983        assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
984    }
985
986    #[test]
987    fn finalize_wraps_emitted_payloads() {
988        let mut state = ExecutionState::new(json!({}));
989        state.push_egress(json!({ "text": "first" }));
990        state.push_egress(json!({ "text": "second" }));
991        let result = state.finalize_with(Some(json!({ "text": "final" })));
992        assert_eq!(
993            result,
994            json!([
995                { "text": "first" },
996                { "text": "second" },
997                { "text": "final" }
998            ])
999        );
1000    }
1001
1002    #[test]
1003    fn finalize_flattens_final_array() {
1004        let mut state = ExecutionState::new(json!({}));
1005        state.push_egress(json!({ "text": "only" }));
1006        let result = state.finalize_with(Some(json!([
1007            { "text": "extra-1" },
1008            { "text": "extra-2" }
1009        ])));
1010        assert_eq!(
1011            result,
1012            json!([
1013                { "text": "only" },
1014                { "text": "extra-1" },
1015                { "text": "extra-2" }
1016            ])
1017        );
1018    }
1019
1020    #[test]
1021    fn missing_operation_reports_node_and_component() {
1022        let engine = minimal_engine();
1023        let rt = Runtime::new().unwrap();
1024        let retry_config = RetryConfig {
1025            max_attempts: 1,
1026            base_delay_ms: 1,
1027        };
1028        let ctx = FlowContext {
1029            tenant: "tenant",
1030            flow_id: "flow",
1031            node_id: Some("missing-op"),
1032            tool: None,
1033            action: None,
1034            session_id: None,
1035            provider_id: None,
1036            retry_config,
1037            observer: None,
1038            mocks: None,
1039        };
1040        let node = HostNode {
1041            kind: NodeKind::Exec {
1042                target_component: "qa.process".into(),
1043            },
1044            component: "component.exec".into(),
1045            operation_in_mapping: None,
1046            payload_expr: Value::Null,
1047            routing: Routing::End,
1048        };
1049        let state = ExecutionState::new(Value::Null);
1050        let payload = json!({ "component": "qa.process" });
1051        let err = rt
1052            .block_on(engine.execute_component_exec(
1053                &ctx,
1054                "missing-op",
1055                &node,
1056                &state,
1057                payload,
1058                None,
1059            ))
1060            .unwrap_err();
1061        let message = err.to_string();
1062        assert!(
1063            message.contains("missing operation for node `missing-op`"),
1064            "unexpected message: {message}"
1065        );
1066        assert!(
1067            message.contains("(component `component.exec`)"),
1068            "unexpected message: {message}"
1069        );
1070    }
1071
1072    #[test]
1073    fn missing_operation_mentions_mapping_hint() {
1074        let engine = minimal_engine();
1075        let rt = Runtime::new().unwrap();
1076        let retry_config = RetryConfig {
1077            max_attempts: 1,
1078            base_delay_ms: 1,
1079        };
1080        let ctx = FlowContext {
1081            tenant: "tenant",
1082            flow_id: "flow",
1083            node_id: Some("missing-op-hint"),
1084            tool: None,
1085            action: None,
1086            session_id: None,
1087            provider_id: None,
1088            retry_config,
1089            observer: None,
1090            mocks: None,
1091        };
1092        let node = HostNode {
1093            kind: NodeKind::Exec {
1094                target_component: "qa.process".into(),
1095            },
1096            component: "component.exec".into(),
1097            operation_in_mapping: Some("render".into()),
1098            payload_expr: Value::Null,
1099            routing: Routing::End,
1100        };
1101        let state = ExecutionState::new(Value::Null);
1102        let payload = json!({ "component": "qa.process" });
1103        let err = rt
1104            .block_on(engine.execute_component_exec(
1105                &ctx,
1106                "missing-op-hint",
1107                &node,
1108                &state,
1109                payload,
1110                None,
1111            ))
1112            .unwrap_err();
1113        let message = err.to_string();
1114        assert!(
1115            message.contains("missing operation for node `missing-op-hint`"),
1116            "unexpected message: {message}"
1117        );
1118        assert!(
1119            message.contains("Found operation in input.mapping (`render`)"),
1120            "unexpected message: {message}"
1121        );
1122    }
1123}
1124
1125use tracing::Instrument;
1126
1127pub struct FlowContext<'a> {
1128    pub tenant: &'a str,
1129    pub flow_id: &'a str,
1130    pub node_id: Option<&'a str>,
1131    pub tool: Option<&'a str>,
1132    pub action: Option<&'a str>,
1133    pub session_id: Option<&'a str>,
1134    pub provider_id: Option<&'a str>,
1135    pub retry_config: RetryConfig,
1136    pub observer: Option<&'a dyn ExecutionObserver>,
1137    pub mocks: Option<&'a MockLayer>,
1138}
1139
1140#[derive(Copy, Clone)]
1141pub struct RetryConfig {
1142    pub max_attempts: u32,
1143    pub base_delay_ms: u64,
1144}
1145
1146fn should_retry(err: &anyhow::Error) -> bool {
1147    let lower = err.to_string().to_lowercase();
1148    lower.contains("transient") || lower.contains("unavailable") || lower.contains("internal")
1149}
1150
1151impl From<FlowRetryConfig> for RetryConfig {
1152    fn from(value: FlowRetryConfig) -> Self {
1153        Self {
1154            max_attempts: value.max_attempts.max(1),
1155            base_delay_ms: value.base_delay_ms.max(50),
1156        }
1157    }
1158}