Skip to main content

greentic_runner_host/runner/
engine.rs

1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
9use anyhow::{Context, Result, anyhow, bail};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use crate::config::{FlowRetryConfig, HostConfig};
18use crate::pack::{FlowDescriptor, PackRuntime};
19use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
20use greentic_types::{Flow, Node, NodeId, Routing};
21
22pub struct FlowEngine {
23    packs: Vec<Arc<PackRuntime>>,
24    flows: Vec<FlowDescriptor>,
25    flow_sources: HashMap<String, usize>,
26    flow_cache: RwLock<HashMap<String, HostFlow>>,
27    default_env: String,
28}
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
31pub struct FlowSnapshot {
32    pub flow_id: String,
33    pub next_node: String,
34    pub state: ExecutionState,
35}
36
37#[derive(Clone, Debug)]
38pub struct FlowWait {
39    pub reason: Option<String>,
40    pub snapshot: FlowSnapshot,
41}
42
43#[derive(Clone, Debug)]
44pub enum FlowStatus {
45    Completed,
46    Waiting(FlowWait),
47}
48
49#[derive(Clone, Debug)]
50pub struct FlowExecution {
51    pub output: Value,
52    pub status: FlowStatus,
53}
54
55#[derive(Clone, Debug)]
56struct HostFlow {
57    id: String,
58    start: Option<NodeId>,
59    nodes: IndexMap<NodeId, HostNode>,
60}
61
62#[derive(Clone, Debug)]
63pub struct HostNode {
64    kind: NodeKind,
65    /// Backwards-compatible component label for observers/transcript.
66    pub component: String,
67    component_id: String,
68    operation_name: Option<String>,
69    operation_in_mapping: Option<String>,
70    payload_expr: Value,
71    routing: Routing,
72}
73
74#[derive(Clone, Debug)]
75enum NodeKind {
76    Exec { target_component: String },
77    PackComponent { component_ref: String },
78    ProviderInvoke,
79    FlowCall,
80    BuiltinEmit { kind: EmitKind },
81    Wait,
82}
83
84#[derive(Clone, Debug)]
85enum EmitKind {
86    Log,
87    Response,
88    Other(String),
89}
90
91struct ComponentOverrides<'a> {
92    component: Option<&'a str>,
93    operation: Option<&'a str>,
94}
95
96struct ComponentCall {
97    component_ref: String,
98    operation: String,
99    input: Value,
100    config: Value,
101}
102
103impl FlowExecution {
104    fn completed(output: Value) -> Self {
105        Self {
106            output,
107            status: FlowStatus::Completed,
108        }
109    }
110
111    fn waiting(output: Value, wait: FlowWait) -> Self {
112        Self {
113            output,
114            status: FlowStatus::Waiting(wait),
115        }
116    }
117}
118
119impl FlowEngine {
120    pub async fn new(packs: Vec<Arc<PackRuntime>>, _config: Arc<HostConfig>) -> Result<Self> {
121        let mut flow_sources = HashMap::new();
122        let mut descriptors = Vec::new();
123        for (idx, pack) in packs.iter().enumerate() {
124            let flows = pack.list_flows().await?;
125            for flow in flows {
126                tracing::info!(
127                    flow_id = %flow.id,
128                    flow_type = %flow.flow_type,
129                    pack_index = idx,
130                    "registered flow"
131                );
132                flow_sources.insert(flow.id.clone(), idx);
133                descriptors.retain(|existing: &FlowDescriptor| existing.id != flow.id);
134                descriptors.push(flow);
135            }
136        }
137
138        let mut flow_map = HashMap::new();
139        for flow in &descriptors {
140            if let Some(&pack_idx) = flow_sources.get(&flow.id) {
141                let pack_clone = Arc::clone(&packs[pack_idx]);
142                let flow_id = flow.id.clone();
143                let task_flow_id = flow_id.clone();
144                match task::spawn_blocking(move || pack_clone.load_flow(&task_flow_id)).await {
145                    Ok(Ok(flow)) => {
146                        flow_map.insert(flow_id, HostFlow::from(flow));
147                    }
148                    Ok(Err(err)) => {
149                        tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
150                    }
151                    Err(err) => {
152                        tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
153                    }
154                }
155            }
156        }
157
158        Ok(Self {
159            packs,
160            flows: descriptors,
161            flow_sources,
162            flow_cache: RwLock::new(flow_map),
163            default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
164        })
165    }
166
167    async fn get_or_load_flow(&self, flow_id: &str) -> Result<HostFlow> {
168        if let Some(flow) = self.flow_cache.read().get(flow_id).cloned() {
169            return Ok(flow);
170        }
171
172        let pack_idx = *self
173            .flow_sources
174            .get(flow_id)
175            .with_context(|| format!("flow {flow_id} not registered"))?;
176        let pack = Arc::clone(&self.packs[pack_idx]);
177        let flow_id_owned = flow_id.to_string();
178        let task_flow_id = flow_id_owned.clone();
179        let flow = task::spawn_blocking(move || pack.load_flow(&task_flow_id))
180            .await
181            .context("failed to join flow metadata task")??;
182        let host_flow = HostFlow::from(flow);
183        self.flow_cache
184            .write()
185            .insert(flow_id_owned.clone(), host_flow.clone());
186        Ok(host_flow)
187    }
188
189    pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
190        let span = tracing::info_span!(
191            "flow.execute",
192            tenant = tracing::field::Empty,
193            flow_id = tracing::field::Empty,
194            node_id = tracing::field::Empty,
195            tool = tracing::field::Empty,
196            action = tracing::field::Empty
197        );
198        annotate_span(
199            &span,
200            &FlowSpanAttributes {
201                tenant: ctx.tenant,
202                flow_id: ctx.flow_id,
203                node_id: ctx.node_id,
204                tool: ctx.tool,
205                action: ctx.action,
206            },
207        );
208        set_flow_context(
209            &self.default_env,
210            ctx.tenant,
211            ctx.flow_id,
212            ctx.node_id,
213            ctx.provider_id,
214            ctx.session_id,
215        );
216        let retry_config = ctx.retry_config;
217        let original_input = input;
218        async move {
219            let mut attempt = 0u32;
220            loop {
221                attempt += 1;
222                match self.execute_once(&ctx, original_input.clone()).await {
223                    Ok(value) => return Ok(value),
224                    Err(err) => {
225                        if attempt >= retry_config.max_attempts || !should_retry(&err) {
226                            return Err(err);
227                        }
228                        let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
229                        tracing::warn!(
230                            tenant = ctx.tenant,
231                            flow_id = ctx.flow_id,
232                            attempt,
233                            max_attempts = retry_config.max_attempts,
234                            delay_ms = delay,
235                            error = %err,
236                            "transient flow execution failure, backing off"
237                        );
238                        tokio::time::sleep(Duration::from_millis(delay)).await;
239                    }
240                }
241            }
242        }
243        .instrument(span)
244        .await
245    }
246
247    pub async fn resume(
248        &self,
249        ctx: FlowContext<'_>,
250        snapshot: FlowSnapshot,
251        input: Value,
252    ) -> Result<FlowExecution> {
253        if snapshot.flow_id != ctx.flow_id {
254            bail!(
255                "snapshot flow {} does not match requested {}",
256                snapshot.flow_id,
257                ctx.flow_id
258            );
259        }
260        let flow_ir = self.get_or_load_flow(ctx.flow_id).await?;
261        let mut state = snapshot.state;
262        state.replace_input(input);
263        self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
264            .await
265    }
266
267    async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
268        let flow_ir = self.get_or_load_flow(ctx.flow_id).await?;
269        let state = ExecutionState::new(input);
270        self.drive_flow(ctx, flow_ir, state, None).await
271    }
272
273    async fn drive_flow(
274        &self,
275        ctx: &FlowContext<'_>,
276        flow_ir: HostFlow,
277        mut state: ExecutionState,
278        resume_from: Option<String>,
279    ) -> Result<FlowExecution> {
280        let mut current = match resume_from {
281            Some(node) => NodeId::from_str(&node)
282                .with_context(|| format!("invalid resume node id `{node}`"))?,
283            None => flow_ir
284                .start
285                .clone()
286                .or_else(|| flow_ir.nodes.keys().next().cloned())
287                .with_context(|| format!("flow {} has no start node", flow_ir.id))?,
288        };
289
290        loop {
291            let node = flow_ir
292                .nodes
293                .get(&current)
294                .with_context(|| format!("node {} not found", current.as_str()))?;
295
296            let payload = node.payload_expr.clone();
297            let observed_payload = payload.clone();
298            let node_id = current.clone();
299            let event = NodeEvent {
300                context: ctx,
301                node_id: node_id.as_str(),
302                node,
303                payload: &observed_payload,
304            };
305            if let Some(observer) = ctx.observer {
306                observer.on_node_start(&event);
307            }
308            let DispatchOutcome {
309                output,
310                wait_reason,
311            } = self
312                .dispatch_node(ctx, node_id.as_str(), node, &mut state, payload)
313                .await?;
314
315            state.nodes.insert(node_id.clone().into(), output.clone());
316
317            let (next, should_exit) = match &node.routing {
318                Routing::Next { node_id } => (Some(node_id.clone()), false),
319                Routing::End | Routing::Reply => (None, true),
320                Routing::Branch { default, .. } => (default.clone(), default.is_none()),
321                Routing::Custom(raw) => {
322                    tracing::warn!(
323                        flow_id = %flow_ir.id,
324                        node_id = %node_id,
325                        routing = ?raw,
326                        "unsupported routing; terminating flow"
327                    );
328                    (None, true)
329                }
330            };
331
332            if let Some(wait_reason) = wait_reason {
333                let resume_target = next.clone().ok_or_else(|| {
334                    anyhow!(
335                        "session.wait node {} requires a non-empty route",
336                        current.as_str()
337                    )
338                })?;
339                let mut snapshot_state = state.clone();
340                snapshot_state.clear_egress();
341                let snapshot = FlowSnapshot {
342                    flow_id: ctx.flow_id.to_string(),
343                    next_node: resume_target.as_str().to_string(),
344                    state: snapshot_state,
345                };
346                let output_value = state.clone().finalize_with(None);
347                return Ok(FlowExecution::waiting(
348                    output_value,
349                    FlowWait {
350                        reason: Some(wait_reason),
351                        snapshot,
352                    },
353                ));
354            }
355
356            if should_exit {
357                return Ok(FlowExecution::completed(
358                    state.finalize_with(Some(output.payload.clone())),
359                ));
360            }
361
362            match next {
363                Some(n) => current = n,
364                None => {
365                    return Ok(FlowExecution::completed(
366                        state.finalize_with(Some(output.payload.clone())),
367                    ));
368                }
369            }
370        }
371    }
372
373    async fn dispatch_node(
374        &self,
375        ctx: &FlowContext<'_>,
376        node_id: &str,
377        node: &HostNode,
378        state: &mut ExecutionState,
379        payload: Value,
380    ) -> Result<DispatchOutcome> {
381        match &node.kind {
382            NodeKind::Exec { target_component } => self
383                .execute_component_exec(
384                    ctx,
385                    node_id,
386                    node,
387                    state,
388                    payload,
389                    ComponentOverrides {
390                        component: Some(target_component.as_str()),
391                        operation: node.operation_name.as_deref(),
392                    },
393                )
394                .await
395                .map(DispatchOutcome::complete),
396            NodeKind::PackComponent { component_ref } => self
397                .execute_component_call(ctx, node_id, node, state, payload, component_ref.as_str())
398                .await
399                .map(DispatchOutcome::complete),
400            NodeKind::FlowCall => self
401                .execute_flow_call(ctx, payload)
402                .await
403                .map(DispatchOutcome::complete),
404            NodeKind::ProviderInvoke => self
405                .execute_provider_invoke(ctx, node_id, state, payload)
406                .await
407                .map(DispatchOutcome::complete),
408            NodeKind::BuiltinEmit { kind } => {
409                match kind {
410                    EmitKind::Log | EmitKind::Response => {}
411                    EmitKind::Other(component) => {
412                        tracing::debug!(%component, "handling emit.* as builtin");
413                    }
414                }
415                state.push_egress(payload.clone());
416                Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
417            }
418            NodeKind::Wait => {
419                let reason = extract_wait_reason(&payload);
420                Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
421            }
422        }
423    }
424
425    async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
426        #[derive(Deserialize)]
427        struct FlowCallPayload {
428            #[serde(alias = "flow")]
429            flow_id: String,
430            #[serde(default)]
431            input: Value,
432        }
433
434        let call: FlowCallPayload =
435            serde_json::from_value(payload).context("invalid payload for flow.call node")?;
436        if call.flow_id.trim().is_empty() {
437            bail!("flow.call requires a non-empty flow_id");
438        }
439
440        let sub_input = if call.input.is_null() {
441            Value::Null
442        } else {
443            call.input
444        };
445
446        let flow_id_owned = call.flow_id;
447        let action = "flow.call";
448        let sub_ctx = FlowContext {
449            tenant: ctx.tenant,
450            flow_id: flow_id_owned.as_str(),
451            node_id: None,
452            tool: ctx.tool,
453            action: Some(action),
454            session_id: ctx.session_id,
455            provider_id: ctx.provider_id,
456            retry_config: ctx.retry_config,
457            observer: ctx.observer,
458            mocks: ctx.mocks,
459        };
460
461        let execution = Box::pin(self.execute(sub_ctx, sub_input))
462            .await
463            .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
464        match execution.status {
465            FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
466            FlowStatus::Waiting(wait) => bail!(
467                "flow.call cannot pause (flow {} waiting {:?})",
468                flow_id_owned,
469                wait.reason
470            ),
471        }
472    }
473
474    async fn execute_component_exec(
475        &self,
476        ctx: &FlowContext<'_>,
477        node_id: &str,
478        node: &HostNode,
479        state: &ExecutionState,
480        payload: Value,
481        overrides: ComponentOverrides<'_>,
482    ) -> Result<NodeOutput> {
483        #[derive(Deserialize)]
484        struct ComponentPayload {
485            #[serde(default, alias = "component_ref", alias = "component")]
486            component: Option<String>,
487            #[serde(alias = "op")]
488            operation: Option<String>,
489            #[serde(default)]
490            input: Value,
491            #[serde(default)]
492            config: Value,
493        }
494
495        let payload: ComponentPayload =
496            serde_json::from_value(payload).context("invalid payload for component.exec")?;
497        let component_ref = overrides
498            .component
499            .map(str::to_string)
500            .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
501            .with_context(|| "component.exec requires a component_ref")?;
502        let operation = resolve_component_operation(
503            node_id,
504            node.component_id.as_str(),
505            payload.operation,
506            overrides.operation,
507            node.operation_in_mapping.as_deref(),
508        )?;
509        let call = ComponentCall {
510            component_ref,
511            operation,
512            input: payload.input,
513            config: payload.config,
514        };
515
516        self.invoke_component_call(ctx, node_id, state, call).await
517    }
518
519    async fn execute_component_call(
520        &self,
521        ctx: &FlowContext<'_>,
522        node_id: &str,
523        node: &HostNode,
524        state: &ExecutionState,
525        payload: Value,
526        component_ref: &str,
527    ) -> Result<NodeOutput> {
528        let payload_operation = extract_operation_from_mapping(&payload);
529        let (input, config) = split_operation_payload(payload);
530        let operation = resolve_component_operation(
531            node_id,
532            node.component_id.as_str(),
533            payload_operation,
534            node.operation_name.as_deref(),
535            node.operation_in_mapping.as_deref(),
536        )?;
537        let call = ComponentCall {
538            component_ref: component_ref.to_string(),
539            operation,
540            input,
541            config,
542        };
543        self.invoke_component_call(ctx, node_id, state, call).await
544    }
545
546    async fn invoke_component_call(
547        &self,
548        ctx: &FlowContext<'_>,
549        node_id: &str,
550        state: &ExecutionState,
551        mut call: ComponentCall,
552    ) -> Result<NodeOutput> {
553        if let Value::Object(ref mut map) = call.input {
554            map.entry("state".to_string())
555                .or_insert_with(|| state.context());
556        }
557        let input_json = serde_json::to_string(&call.input)?;
558        let config_json = if call.config.is_null() {
559            None
560        } else {
561            Some(serde_json::to_string(&call.config)?)
562        };
563
564        let pack_idx = *self
565            .flow_sources
566            .get(ctx.flow_id)
567            .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
568        let pack = Arc::clone(&self.packs[pack_idx]);
569        let exec_ctx = component_exec_ctx(ctx, node_id);
570        let value = pack
571            .invoke_component(
572                call.component_ref.as_str(),
573                exec_ctx,
574                call.operation.as_str(),
575                config_json,
576                input_json,
577            )
578            .await?;
579
580        Ok(NodeOutput::new(value))
581    }
582
583    async fn execute_provider_invoke(
584        &self,
585        ctx: &FlowContext<'_>,
586        node_id: &str,
587        state: &ExecutionState,
588        payload: Value,
589    ) -> Result<NodeOutput> {
590        #[derive(Deserialize)]
591        struct ProviderPayload {
592            #[serde(default)]
593            provider_id: Option<String>,
594            #[serde(default)]
595            provider_type: Option<String>,
596            #[serde(default, alias = "operation")]
597            op: Option<String>,
598            #[serde(default)]
599            input: Value,
600            #[serde(default)]
601            in_map: Value,
602            #[serde(default)]
603            out_map: Value,
604            #[serde(default)]
605            err_map: Value,
606        }
607
608        let payload: ProviderPayload =
609            serde_json::from_value(payload).context("invalid payload for provider.invoke")?;
610        let op = payload
611            .op
612            .as_deref()
613            .filter(|v| !v.trim().is_empty())
614            .with_context(|| "provider.invoke requires an op")?
615            .to_string();
616
617        let mut input_value = if !payload.in_map.is_null() {
618            let ctx_value = mapping_ctx(payload.input.clone(), state);
619            apply_mapping(&payload.in_map, &ctx_value)
620                .context("failed to evaluate provider.invoke in_map")?
621        } else if !payload.input.is_null() {
622            payload.input
623        } else {
624            Value::Null
625        };
626
627        if let Value::Object(ref mut map) = input_value {
628            map.entry("state".to_string())
629                .or_insert_with(|| state.context());
630        }
631        let input_json = serde_json::to_vec(&input_value)?;
632
633        let pack_idx = *self
634            .flow_sources
635            .get(ctx.flow_id)
636            .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
637        let pack = Arc::clone(&self.packs[pack_idx]);
638        let binding = pack.resolve_provider(
639            payload.provider_id.as_deref(),
640            payload.provider_type.as_deref(),
641        )?;
642        let exec_ctx = component_exec_ctx(ctx, node_id);
643        let result = pack
644            .invoke_provider(&binding, exec_ctx, &op, input_json)
645            .await?;
646
647        let output = if payload.out_map.is_null() {
648            result
649        } else {
650            let ctx_value = mapping_ctx(result, state);
651            apply_mapping(&payload.out_map, &ctx_value)
652                .context("failed to evaluate provider.invoke out_map")?
653        };
654        let _ = payload.err_map;
655        Ok(NodeOutput::new(output))
656    }
657
658    pub fn flows(&self) -> &[FlowDescriptor] {
659        &self.flows
660    }
661
662    pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
663        self.flows
664            .iter()
665            .find(|descriptor| descriptor.flow_type == flow_type)
666    }
667
668    pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
669        self.flows
670            .iter()
671            .find(|descriptor| descriptor.id == flow_id)
672    }
673}
674
675pub trait ExecutionObserver: Send + Sync {
676    fn on_node_start(&self, event: &NodeEvent<'_>);
677    fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
678    fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
679}
680
681pub struct NodeEvent<'a> {
682    pub context: &'a FlowContext<'a>,
683    pub node_id: &'a str,
684    pub node: &'a HostNode,
685    pub payload: &'a Value,
686}
687
688#[derive(Clone, Debug, Serialize, Deserialize)]
689pub struct ExecutionState {
690    input: Value,
691    nodes: HashMap<String, NodeOutput>,
692    egress: Vec<Value>,
693}
694
695impl ExecutionState {
696    fn new(input: Value) -> Self {
697        Self {
698            input,
699            nodes: HashMap::new(),
700            egress: Vec::new(),
701        }
702    }
703
704    fn context(&self) -> Value {
705        let mut nodes = JsonMap::new();
706        for (id, output) in &self.nodes {
707            nodes.insert(
708                id.clone(),
709                json!({
710                    "ok": output.ok,
711                    "payload": output.payload.clone(),
712                    "meta": output.meta.clone(),
713                }),
714            );
715        }
716        json!({
717            "input": self.input.clone(),
718            "nodes": nodes,
719        })
720    }
721    fn push_egress(&mut self, payload: Value) {
722        self.egress.push(payload);
723    }
724
725    fn replace_input(&mut self, input: Value) {
726        self.input = input;
727    }
728
729    fn clear_egress(&mut self) {
730        self.egress.clear();
731    }
732
733    fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
734        if self.egress.is_empty() {
735            return final_payload.unwrap_or(Value::Null);
736        }
737        let mut emitted = std::mem::take(&mut self.egress);
738        if let Some(value) = final_payload {
739            match value {
740                Value::Null => {}
741                Value::Array(items) => emitted.extend(items),
742                other => emitted.push(other),
743            }
744        }
745        Value::Array(emitted)
746    }
747}
748
749#[derive(Clone, Debug, Serialize, Deserialize)]
750struct NodeOutput {
751    ok: bool,
752    payload: Value,
753    meta: Value,
754}
755
756impl NodeOutput {
757    fn new(payload: Value) -> Self {
758        Self {
759            ok: true,
760            payload,
761            meta: Value::Null,
762        }
763    }
764}
765
766struct DispatchOutcome {
767    output: NodeOutput,
768    wait_reason: Option<String>,
769}
770
771impl DispatchOutcome {
772    fn complete(output: NodeOutput) -> Self {
773        Self {
774            output,
775            wait_reason: None,
776        }
777    }
778
779    fn wait(output: NodeOutput, reason: Option<String>) -> Self {
780        Self {
781            output,
782            wait_reason: reason,
783        }
784    }
785}
786
787fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
788    ComponentExecCtx {
789        tenant: ComponentTenantCtx {
790            tenant: ctx.tenant.to_string(),
791            team: None,
792            user: ctx.provider_id.map(str::to_string),
793            trace_id: None,
794            correlation_id: ctx.session_id.map(str::to_string),
795            deadline_unix_ms: None,
796            attempt: 1,
797            idempotency_key: ctx.session_id.map(str::to_string),
798        },
799        flow_id: ctx.flow_id.to_string(),
800        node_id: Some(node_id.to_string()),
801    }
802}
803
804fn extract_wait_reason(payload: &Value) -> Option<String> {
805    match payload {
806        Value::String(s) => Some(s.clone()),
807        Value::Object(map) => map
808            .get("reason")
809            .and_then(Value::as_str)
810            .map(|value| value.to_string()),
811        _ => None,
812    }
813}
814
815fn mapping_ctx(root: Value, state: &ExecutionState) -> Value {
816    json!({
817        "input": root.clone(),
818        "result": root,
819        "state": state.context(),
820    })
821}
822
823fn apply_mapping(template: &Value, ctx: &Value) -> Result<Value> {
824    match template {
825        Value::String(path) if path.starts_with('/') => ctx
826            .pointer(path)
827            .cloned()
828            .ok_or_else(|| anyhow!("mapping path `{path}` not found")),
829        Value::Array(items) => {
830            let mut mapped = Vec::with_capacity(items.len());
831            for item in items {
832                mapped.push(apply_mapping(item, ctx)?);
833            }
834            Ok(Value::Array(mapped))
835        }
836        Value::Object(map) => {
837            let mut out = serde_json::Map::new();
838            for (key, value) in map {
839                out.insert(key.clone(), apply_mapping(value, ctx)?);
840            }
841            Ok(Value::Object(out))
842        }
843        other => Ok(other.clone()),
844    }
845}
846
847impl From<Flow> for HostFlow {
848    fn from(value: Flow) -> Self {
849        let mut nodes = IndexMap::new();
850        for (id, node) in value.nodes {
851            nodes.insert(id.clone(), HostNode::from(node));
852        }
853        let start = value
854            .entrypoints
855            .get("default")
856            .and_then(Value::as_str)
857            .and_then(|id| NodeId::from_str(id).ok())
858            .or_else(|| nodes.keys().next().cloned());
859        Self {
860            id: value.id.as_str().to_string(),
861            start,
862            nodes,
863        }
864    }
865}
866
867impl From<Node> for HostNode {
868    fn from(node: Node) -> Self {
869        let component_ref = node.component.id.as_str().to_string();
870        let raw_operation = node.component.operation.clone();
871        let operation_in_mapping = extract_operation_from_mapping(&node.input.mapping);
872        let operation_is_component_exec = raw_operation.as_deref() == Some("component.exec");
873        let operation_is_emit = raw_operation
874            .as_deref()
875            .map(|op| op.starts_with("emit."))
876            .unwrap_or(false);
877        let is_component_exec = component_ref == "component.exec" || operation_is_component_exec;
878
879        let kind = if is_component_exec {
880            let target = if component_ref == "component.exec" {
881                if let Some(op) = raw_operation
882                    .as_deref()
883                    .filter(|op| op.starts_with("emit."))
884                {
885                    op.to_string()
886                } else {
887                    extract_target_component(&node.input.mapping)
888                        .unwrap_or_else(|| "component.exec".to_string())
889                }
890            } else {
891                extract_target_component(&node.input.mapping)
892                    .unwrap_or_else(|| component_ref.clone())
893            };
894            if target.starts_with("emit.") {
895                NodeKind::BuiltinEmit {
896                    kind: emit_kind_from_ref(&target),
897                }
898            } else {
899                NodeKind::Exec {
900                    target_component: target,
901                }
902            }
903        } else if operation_is_emit {
904            NodeKind::BuiltinEmit {
905                kind: emit_kind_from_ref(raw_operation.as_deref().unwrap_or("emit.log")),
906            }
907        } else {
908            match component_ref.as_str() {
909                "flow.call" => NodeKind::FlowCall,
910                "provider.invoke" => NodeKind::ProviderInvoke,
911                "session.wait" => NodeKind::Wait,
912                comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
913                    kind: emit_kind_from_ref(comp),
914                },
915                other => NodeKind::PackComponent {
916                    component_ref: other.to_string(),
917                },
918            }
919        };
920        let component_label = match &kind {
921            NodeKind::Exec { .. } => "component.exec".to_string(),
922            NodeKind::PackComponent { component_ref } => component_ref.clone(),
923            NodeKind::ProviderInvoke => "provider.invoke".to_string(),
924            NodeKind::FlowCall => "flow.call".to_string(),
925            NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
926            NodeKind::Wait => "session.wait".to_string(),
927        };
928        let operation_name = if is_component_exec && operation_is_component_exec {
929            None
930        } else {
931            raw_operation.clone()
932        };
933        let payload_expr = match kind {
934            NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.input.mapping),
935            _ => node.input.mapping.clone(),
936        };
937        Self {
938            kind,
939            component: component_label,
940            component_id: if is_component_exec {
941                "component.exec".to_string()
942            } else {
943                component_ref
944            },
945            operation_name,
946            operation_in_mapping,
947            payload_expr,
948            routing: node.routing,
949        }
950    }
951}
952
953fn extract_target_component(payload: &Value) -> Option<String> {
954    match payload {
955        Value::Object(map) => map
956            .get("component")
957            .or_else(|| map.get("component_ref"))
958            .and_then(Value::as_str)
959            .map(|s| s.to_string()),
960        _ => None,
961    }
962}
963
964fn extract_operation_from_mapping(payload: &Value) -> Option<String> {
965    match payload {
966        Value::Object(map) => map
967            .get("operation")
968            .or_else(|| map.get("op"))
969            .and_then(Value::as_str)
970            .map(str::trim)
971            .filter(|value| !value.is_empty())
972            .map(|value| value.to_string()),
973        _ => None,
974    }
975}
976
977fn extract_emit_payload(payload: &Value) -> Value {
978    if let Value::Object(map) = payload {
979        if let Some(input) = map.get("input") {
980            return input.clone();
981        }
982        if let Some(inner) = map.get("payload") {
983            return inner.clone();
984        }
985    }
986    payload.clone()
987}
988
989fn split_operation_payload(payload: Value) -> (Value, Value) {
990    if let Value::Object(mut map) = payload.clone()
991        && map.contains_key("input")
992    {
993        let input = map.remove("input").unwrap_or(Value::Null);
994        let config = map.remove("config").unwrap_or(Value::Null);
995        let legacy_only = map.keys().all(|key| {
996            matches!(
997                key.as_str(),
998                "operation" | "op" | "component" | "component_ref"
999            )
1000        });
1001        if legacy_only {
1002            return (input, config);
1003        }
1004    }
1005    (payload, Value::Null)
1006}
1007
1008fn resolve_component_operation(
1009    node_id: &str,
1010    component_label: &str,
1011    payload_operation: Option<String>,
1012    operation_override: Option<&str>,
1013    operation_in_mapping: Option<&str>,
1014) -> Result<String> {
1015    if let Some(op) = operation_override
1016        .map(str::trim)
1017        .filter(|value| !value.is_empty())
1018    {
1019        return Ok(op.to_string());
1020    }
1021
1022    if let Some(op) = payload_operation
1023        .as_deref()
1024        .map(str::trim)
1025        .filter(|value| !value.is_empty())
1026    {
1027        return Ok(op.to_string());
1028    }
1029
1030    let mut message = format!(
1031        "missing operation for node `{}` (component `{}`); expected node.component.operation to be set",
1032        node_id, component_label,
1033    );
1034    if let Some(found) = operation_in_mapping {
1035        message.push_str(&format!(
1036            ". Found operation in input.mapping (`{}`) but this is not used; pack compiler must preserve node.component.operation.",
1037            found
1038        ));
1039    }
1040    bail!(message);
1041}
1042
1043fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
1044    match component_ref {
1045        "emit.log" => EmitKind::Log,
1046        "emit.response" => EmitKind::Response,
1047        other => EmitKind::Other(other.to_string()),
1048    }
1049}
1050
1051fn emit_ref_from_kind(kind: &EmitKind) -> String {
1052    match kind {
1053        EmitKind::Log => "emit.log".to_string(),
1054        EmitKind::Response => "emit.response".to_string(),
1055        EmitKind::Other(other) => other.clone(),
1056    }
1057}
1058
1059#[cfg(test)]
1060mod tests {
1061    use super::*;
1062    use serde_json::json;
1063    use tokio::runtime::Runtime;
1064
1065    fn minimal_engine() -> FlowEngine {
1066        FlowEngine {
1067            packs: Vec::new(),
1068            flows: Vec::new(),
1069            flow_sources: HashMap::new(),
1070            flow_cache: RwLock::new(HashMap::new()),
1071            default_env: "local".to_string(),
1072        }
1073    }
1074
1075    #[test]
1076    fn templating_renders_with_partials_and_data() {
1077        let mut state = ExecutionState::new(json!({ "city": "London" }));
1078        state.nodes.insert(
1079            "forecast".to_string(),
1080            NodeOutput::new(json!({ "temp": "20C" })),
1081        );
1082
1083        // templating handled via component now; ensure context still includes node outputs
1084        let ctx = state.context();
1085        assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
1086    }
1087
1088    #[test]
1089    fn finalize_wraps_emitted_payloads() {
1090        let mut state = ExecutionState::new(json!({}));
1091        state.push_egress(json!({ "text": "first" }));
1092        state.push_egress(json!({ "text": "second" }));
1093        let result = state.finalize_with(Some(json!({ "text": "final" })));
1094        assert_eq!(
1095            result,
1096            json!([
1097                { "text": "first" },
1098                { "text": "second" },
1099                { "text": "final" }
1100            ])
1101        );
1102    }
1103
1104    #[test]
1105    fn finalize_flattens_final_array() {
1106        let mut state = ExecutionState::new(json!({}));
1107        state.push_egress(json!({ "text": "only" }));
1108        let result = state.finalize_with(Some(json!([
1109            { "text": "extra-1" },
1110            { "text": "extra-2" }
1111        ])));
1112        assert_eq!(
1113            result,
1114            json!([
1115                { "text": "only" },
1116                { "text": "extra-1" },
1117                { "text": "extra-2" }
1118            ])
1119        );
1120    }
1121
1122    #[test]
1123    fn missing_operation_reports_node_and_component() {
1124        let engine = minimal_engine();
1125        let rt = Runtime::new().unwrap();
1126        let retry_config = RetryConfig {
1127            max_attempts: 1,
1128            base_delay_ms: 1,
1129        };
1130        let ctx = FlowContext {
1131            tenant: "tenant",
1132            flow_id: "flow",
1133            node_id: Some("missing-op"),
1134            tool: None,
1135            action: None,
1136            session_id: None,
1137            provider_id: None,
1138            retry_config,
1139            observer: None,
1140            mocks: None,
1141        };
1142        let node = HostNode {
1143            kind: NodeKind::Exec {
1144                target_component: "qa.process".into(),
1145            },
1146            component: "component.exec".into(),
1147            component_id: "component.exec".into(),
1148            operation_name: None,
1149            operation_in_mapping: None,
1150            payload_expr: Value::Null,
1151            routing: Routing::End,
1152        };
1153        let state = ExecutionState::new(Value::Null);
1154        let payload = json!({ "component": "qa.process" });
1155        let err = rt
1156            .block_on(engine.execute_component_exec(
1157                &ctx,
1158                "missing-op",
1159                &node,
1160                &state,
1161                payload,
1162                ComponentOverrides {
1163                    component: None,
1164                    operation: None,
1165                },
1166            ))
1167            .unwrap_err();
1168        let message = err.to_string();
1169        assert!(
1170            message.contains("missing operation for node `missing-op`"),
1171            "unexpected message: {message}"
1172        );
1173        assert!(
1174            message.contains("(component `component.exec`)"),
1175            "unexpected message: {message}"
1176        );
1177    }
1178
1179    #[test]
1180    fn missing_operation_mentions_mapping_hint() {
1181        let engine = minimal_engine();
1182        let rt = Runtime::new().unwrap();
1183        let retry_config = RetryConfig {
1184            max_attempts: 1,
1185            base_delay_ms: 1,
1186        };
1187        let ctx = FlowContext {
1188            tenant: "tenant",
1189            flow_id: "flow",
1190            node_id: Some("missing-op-hint"),
1191            tool: None,
1192            action: None,
1193            session_id: None,
1194            provider_id: None,
1195            retry_config,
1196            observer: None,
1197            mocks: None,
1198        };
1199        let node = HostNode {
1200            kind: NodeKind::Exec {
1201                target_component: "qa.process".into(),
1202            },
1203            component: "component.exec".into(),
1204            component_id: "component.exec".into(),
1205            operation_name: None,
1206            operation_in_mapping: Some("render".into()),
1207            payload_expr: Value::Null,
1208            routing: Routing::End,
1209        };
1210        let state = ExecutionState::new(Value::Null);
1211        let payload = json!({ "component": "qa.process" });
1212        let err = rt
1213            .block_on(engine.execute_component_exec(
1214                &ctx,
1215                "missing-op-hint",
1216                &node,
1217                &state,
1218                payload,
1219                ComponentOverrides {
1220                    component: None,
1221                    operation: None,
1222                },
1223            ))
1224            .unwrap_err();
1225        let message = err.to_string();
1226        assert!(
1227            message.contains("missing operation for node `missing-op-hint`"),
1228            "unexpected message: {message}"
1229        );
1230        assert!(
1231            message.contains("Found operation in input.mapping (`render`)"),
1232            "unexpected message: {message}"
1233        );
1234    }
1235}
1236
1237use tracing::Instrument;
1238
1239pub struct FlowContext<'a> {
1240    pub tenant: &'a str,
1241    pub flow_id: &'a str,
1242    pub node_id: Option<&'a str>,
1243    pub tool: Option<&'a str>,
1244    pub action: Option<&'a str>,
1245    pub session_id: Option<&'a str>,
1246    pub provider_id: Option<&'a str>,
1247    pub retry_config: RetryConfig,
1248    pub observer: Option<&'a dyn ExecutionObserver>,
1249    pub mocks: Option<&'a MockLayer>,
1250}
1251
1252#[derive(Copy, Clone)]
1253pub struct RetryConfig {
1254    pub max_attempts: u32,
1255    pub base_delay_ms: u64,
1256}
1257
1258fn should_retry(err: &anyhow::Error) -> bool {
1259    let lower = err.to_string().to_lowercase();
1260    lower.contains("transient") || lower.contains("unavailable") || lower.contains("internal")
1261}
1262
1263impl From<FlowRetryConfig> for RetryConfig {
1264    fn from(value: FlowRetryConfig) -> Self {
1265        Self {
1266            max_attempts: value.max_attempts.max(1),
1267            base_delay_ms: value.base_delay_ms.max(50),
1268        }
1269    }
1270}