Skip to main content

greentic_runner_host/runner/
engine.rs

1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
9use anyhow::{Context, Result, anyhow, bail};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use crate::config::{FlowRetryConfig, HostConfig};
18use crate::pack::{FlowDescriptor, PackRuntime};
19use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
20use greentic_types::{Flow, Node, NodeId, Routing};
21
22pub struct FlowEngine {
23    packs: Vec<Arc<PackRuntime>>,
24    flows: Vec<FlowDescriptor>,
25    flow_sources: HashMap<String, usize>,
26    flow_cache: RwLock<HashMap<String, HostFlow>>,
27    default_env: String,
28}
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
31pub struct FlowSnapshot {
32    pub flow_id: String,
33    pub next_node: String,
34    pub state: ExecutionState,
35}
36
37#[derive(Clone, Debug)]
38pub struct FlowWait {
39    pub reason: Option<String>,
40    pub snapshot: FlowSnapshot,
41}
42
43#[derive(Clone, Debug)]
44pub enum FlowStatus {
45    Completed,
46    Waiting(FlowWait),
47}
48
49#[derive(Clone, Debug)]
50pub struct FlowExecution {
51    pub output: Value,
52    pub status: FlowStatus,
53}
54
55#[derive(Clone, Debug)]
56struct HostFlow {
57    id: String,
58    start: Option<NodeId>,
59    nodes: IndexMap<NodeId, HostNode>,
60}
61
62#[derive(Clone, Debug)]
63pub struct HostNode {
64    kind: NodeKind,
65    /// Backwards-compatible component label for observers/transcript.
66    pub component: String,
67    component_id: String,
68    operation_name: Option<String>,
69    operation_in_mapping: Option<String>,
70    payload_expr: Value,
71    routing: Routing,
72}
73
74#[derive(Clone, Debug)]
75enum NodeKind {
76    Exec { target_component: String },
77    PackComponent { component_ref: String },
78    ProviderInvoke,
79    FlowCall,
80    BuiltinEmit { kind: EmitKind },
81    Wait,
82}
83
84#[derive(Clone, Debug)]
85enum EmitKind {
86    Log,
87    Response,
88    Other(String),
89}
90
91struct ComponentOverrides<'a> {
92    component: Option<&'a str>,
93    operation: Option<&'a str>,
94}
95
96struct ComponentCall {
97    component_ref: String,
98    operation: String,
99    input: Value,
100    config: Value,
101}
102
103impl FlowExecution {
104    fn completed(output: Value) -> Self {
105        Self {
106            output,
107            status: FlowStatus::Completed,
108        }
109    }
110
111    fn waiting(output: Value, wait: FlowWait) -> Self {
112        Self {
113            output,
114            status: FlowStatus::Waiting(wait),
115        }
116    }
117}
118
119impl FlowEngine {
120    pub async fn new(packs: Vec<Arc<PackRuntime>>, _config: Arc<HostConfig>) -> Result<Self> {
121        let mut flow_sources = HashMap::new();
122        let mut descriptors = Vec::new();
123        for (idx, pack) in packs.iter().enumerate() {
124            let flows = pack.list_flows().await?;
125            for flow in flows {
126                tracing::info!(
127                    flow_id = %flow.id,
128                    flow_type = %flow.flow_type,
129                    pack_index = idx,
130                    "registered flow"
131                );
132                flow_sources.insert(flow.id.clone(), idx);
133                descriptors.retain(|existing: &FlowDescriptor| existing.id != flow.id);
134                descriptors.push(flow);
135            }
136        }
137
138        let mut flow_map = HashMap::new();
139        for flow in &descriptors {
140            if let Some(&pack_idx) = flow_sources.get(&flow.id) {
141                let pack_clone = Arc::clone(&packs[pack_idx]);
142                let flow_id = flow.id.clone();
143                let task_flow_id = flow_id.clone();
144                match task::spawn_blocking(move || pack_clone.load_flow(&task_flow_id)).await {
145                    Ok(Ok(flow)) => {
146                        flow_map.insert(flow_id, HostFlow::from(flow));
147                    }
148                    Ok(Err(err)) => {
149                        tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
150                    }
151                    Err(err) => {
152                        tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
153                    }
154                }
155            }
156        }
157
158        Ok(Self {
159            packs,
160            flows: descriptors,
161            flow_sources,
162            flow_cache: RwLock::new(flow_map),
163            default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
164        })
165    }
166
167    async fn get_or_load_flow(&self, flow_id: &str) -> Result<HostFlow> {
168        if let Some(flow) = self.flow_cache.read().get(flow_id).cloned() {
169            return Ok(flow);
170        }
171
172        let pack_idx = *self
173            .flow_sources
174            .get(flow_id)
175            .with_context(|| format!("flow {flow_id} not registered"))?;
176        let pack = Arc::clone(&self.packs[pack_idx]);
177        let flow_id_owned = flow_id.to_string();
178        let task_flow_id = flow_id_owned.clone();
179        let flow = task::spawn_blocking(move || pack.load_flow(&task_flow_id))
180            .await
181            .context("failed to join flow metadata task")??;
182        let host_flow = HostFlow::from(flow);
183        self.flow_cache
184            .write()
185            .insert(flow_id_owned.clone(), host_flow.clone());
186        Ok(host_flow)
187    }
188
189    pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
190        let span = tracing::info_span!(
191            "flow.execute",
192            tenant = tracing::field::Empty,
193            flow_id = tracing::field::Empty,
194            node_id = tracing::field::Empty,
195            tool = tracing::field::Empty,
196            action = tracing::field::Empty
197        );
198        annotate_span(
199            &span,
200            &FlowSpanAttributes {
201                tenant: ctx.tenant,
202                flow_id: ctx.flow_id,
203                node_id: ctx.node_id,
204                tool: ctx.tool,
205                action: ctx.action,
206            },
207        );
208        set_flow_context(
209            &self.default_env,
210            ctx.tenant,
211            ctx.flow_id,
212            ctx.node_id,
213            ctx.provider_id,
214            ctx.session_id,
215        );
216        let retry_config = ctx.retry_config;
217        let original_input = input;
218        async move {
219            let mut attempt = 0u32;
220            loop {
221                attempt += 1;
222                match self.execute_once(&ctx, original_input.clone()).await {
223                    Ok(value) => return Ok(value),
224                    Err(err) => {
225                        if attempt >= retry_config.max_attempts || !should_retry(&err) {
226                            return Err(err);
227                        }
228                        let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
229                        tracing::warn!(
230                            tenant = ctx.tenant,
231                            flow_id = ctx.flow_id,
232                            attempt,
233                            max_attempts = retry_config.max_attempts,
234                            delay_ms = delay,
235                            error = %err,
236                            "transient flow execution failure, backing off"
237                        );
238                        tokio::time::sleep(Duration::from_millis(delay)).await;
239                    }
240                }
241            }
242        }
243        .instrument(span)
244        .await
245    }
246
247    pub async fn resume(
248        &self,
249        ctx: FlowContext<'_>,
250        snapshot: FlowSnapshot,
251        input: Value,
252    ) -> Result<FlowExecution> {
253        if snapshot.flow_id != ctx.flow_id {
254            bail!(
255                "snapshot flow {} does not match requested {}",
256                snapshot.flow_id,
257                ctx.flow_id
258            );
259        }
260        let flow_ir = self.get_or_load_flow(ctx.flow_id).await?;
261        let mut state = snapshot.state;
262        state.replace_input(input);
263        self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
264            .await
265    }
266
267    async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
268        let flow_ir = self.get_or_load_flow(ctx.flow_id).await?;
269        let state = ExecutionState::new(input);
270        self.drive_flow(ctx, flow_ir, state, None).await
271    }
272
273    async fn drive_flow(
274        &self,
275        ctx: &FlowContext<'_>,
276        flow_ir: HostFlow,
277        mut state: ExecutionState,
278        resume_from: Option<String>,
279    ) -> Result<FlowExecution> {
280        let mut current = match resume_from {
281            Some(node) => NodeId::from_str(&node)
282                .with_context(|| format!("invalid resume node id `{node}`"))?,
283            None => flow_ir
284                .start
285                .clone()
286                .or_else(|| flow_ir.nodes.keys().next().cloned())
287                .with_context(|| format!("flow {} has no start node", flow_ir.id))?,
288        };
289
290        loop {
291            let node = flow_ir
292                .nodes
293                .get(&current)
294                .with_context(|| format!("node {} not found", current.as_str()))?;
295
296            let payload = node.payload_expr.clone();
297            let observed_payload = payload.clone();
298            let node_id = current.clone();
299            let event = NodeEvent {
300                context: ctx,
301                node_id: node_id.as_str(),
302                node,
303                payload: &observed_payload,
304            };
305            if let Some(observer) = ctx.observer {
306                observer.on_node_start(&event);
307            }
308            let DispatchOutcome {
309                output,
310                wait_reason,
311            } = self
312                .dispatch_node(ctx, node_id.as_str(), node, &mut state, payload)
313                .await?;
314
315            state.nodes.insert(node_id.clone().into(), output.clone());
316            if let Some(observer) = ctx.observer {
317                observer.on_node_end(&event, &output.payload);
318            }
319
320            let (next, should_exit) = match &node.routing {
321                Routing::Next { node_id } => (Some(node_id.clone()), false),
322                Routing::End | Routing::Reply => (None, true),
323                Routing::Branch { default, .. } => (default.clone(), default.is_none()),
324                Routing::Custom(raw) => {
325                    tracing::warn!(
326                        flow_id = %flow_ir.id,
327                        node_id = %node_id,
328                        routing = ?raw,
329                        "unsupported routing; terminating flow"
330                    );
331                    (None, true)
332                }
333            };
334
335            if let Some(wait_reason) = wait_reason {
336                let resume_target = next.clone().ok_or_else(|| {
337                    anyhow!(
338                        "session.wait node {} requires a non-empty route",
339                        current.as_str()
340                    )
341                })?;
342                let mut snapshot_state = state.clone();
343                snapshot_state.clear_egress();
344                let snapshot = FlowSnapshot {
345                    flow_id: ctx.flow_id.to_string(),
346                    next_node: resume_target.as_str().to_string(),
347                    state: snapshot_state,
348                };
349                let output_value = state.clone().finalize_with(None);
350                return Ok(FlowExecution::waiting(
351                    output_value,
352                    FlowWait {
353                        reason: Some(wait_reason),
354                        snapshot,
355                    },
356                ));
357            }
358
359            if should_exit {
360                return Ok(FlowExecution::completed(
361                    state.finalize_with(Some(output.payload.clone())),
362                ));
363            }
364
365            match next {
366                Some(n) => current = n,
367                None => {
368                    return Ok(FlowExecution::completed(
369                        state.finalize_with(Some(output.payload.clone())),
370                    ));
371                }
372            }
373        }
374    }
375
376    async fn dispatch_node(
377        &self,
378        ctx: &FlowContext<'_>,
379        node_id: &str,
380        node: &HostNode,
381        state: &mut ExecutionState,
382        payload: Value,
383    ) -> Result<DispatchOutcome> {
384        match &node.kind {
385            NodeKind::Exec { target_component } => self
386                .execute_component_exec(
387                    ctx,
388                    node_id,
389                    node,
390                    state,
391                    payload,
392                    ComponentOverrides {
393                        component: Some(target_component.as_str()),
394                        operation: node.operation_name.as_deref(),
395                    },
396                )
397                .await
398                .map(DispatchOutcome::complete),
399            NodeKind::PackComponent { component_ref } => self
400                .execute_component_call(ctx, node_id, node, state, payload, component_ref.as_str())
401                .await
402                .map(DispatchOutcome::complete),
403            NodeKind::FlowCall => self
404                .execute_flow_call(ctx, payload)
405                .await
406                .map(DispatchOutcome::complete),
407            NodeKind::ProviderInvoke => self
408                .execute_provider_invoke(ctx, node_id, state, payload)
409                .await
410                .map(DispatchOutcome::complete),
411            NodeKind::BuiltinEmit { kind } => {
412                match kind {
413                    EmitKind::Log | EmitKind::Response => {}
414                    EmitKind::Other(component) => {
415                        tracing::debug!(%component, "handling emit.* as builtin");
416                    }
417                }
418                state.push_egress(payload.clone());
419                Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
420            }
421            NodeKind::Wait => {
422                let reason = extract_wait_reason(&payload);
423                Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
424            }
425        }
426    }
427
428    async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
429        #[derive(Deserialize)]
430        struct FlowCallPayload {
431            #[serde(alias = "flow")]
432            flow_id: String,
433            #[serde(default)]
434            input: Value,
435        }
436
437        let call: FlowCallPayload =
438            serde_json::from_value(payload).context("invalid payload for flow.call node")?;
439        if call.flow_id.trim().is_empty() {
440            bail!("flow.call requires a non-empty flow_id");
441        }
442
443        let sub_input = if call.input.is_null() {
444            Value::Null
445        } else {
446            call.input
447        };
448
449        let flow_id_owned = call.flow_id;
450        let action = "flow.call";
451        let sub_ctx = FlowContext {
452            tenant: ctx.tenant,
453            flow_id: flow_id_owned.as_str(),
454            node_id: None,
455            tool: ctx.tool,
456            action: Some(action),
457            session_id: ctx.session_id,
458            provider_id: ctx.provider_id,
459            retry_config: ctx.retry_config,
460            observer: ctx.observer,
461            mocks: ctx.mocks,
462        };
463
464        let execution = Box::pin(self.execute(sub_ctx, sub_input))
465            .await
466            .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
467        match execution.status {
468            FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
469            FlowStatus::Waiting(wait) => bail!(
470                "flow.call cannot pause (flow {} waiting {:?})",
471                flow_id_owned,
472                wait.reason
473            ),
474        }
475    }
476
477    async fn execute_component_exec(
478        &self,
479        ctx: &FlowContext<'_>,
480        node_id: &str,
481        node: &HostNode,
482        state: &ExecutionState,
483        payload: Value,
484        overrides: ComponentOverrides<'_>,
485    ) -> Result<NodeOutput> {
486        #[derive(Deserialize)]
487        struct ComponentPayload {
488            #[serde(default, alias = "component_ref", alias = "component")]
489            component: Option<String>,
490            #[serde(alias = "op")]
491            operation: Option<String>,
492            #[serde(default)]
493            input: Value,
494            #[serde(default)]
495            config: Value,
496        }
497
498        let payload: ComponentPayload =
499            serde_json::from_value(payload).context("invalid payload for component.exec")?;
500        let component_ref = overrides
501            .component
502            .map(str::to_string)
503            .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
504            .with_context(|| "component.exec requires a component_ref")?;
505        let operation = resolve_component_operation(
506            node_id,
507            node.component_id.as_str(),
508            payload.operation,
509            overrides.operation,
510            node.operation_in_mapping.as_deref(),
511        )?;
512        let call = ComponentCall {
513            component_ref,
514            operation,
515            input: payload.input,
516            config: payload.config,
517        };
518
519        self.invoke_component_call(ctx, node_id, state, call).await
520    }
521
522    async fn execute_component_call(
523        &self,
524        ctx: &FlowContext<'_>,
525        node_id: &str,
526        node: &HostNode,
527        state: &ExecutionState,
528        payload: Value,
529        component_ref: &str,
530    ) -> Result<NodeOutput> {
531        let payload_operation = extract_operation_from_mapping(&payload);
532        let (input, config) = split_operation_payload(payload);
533        let operation = resolve_component_operation(
534            node_id,
535            node.component_id.as_str(),
536            payload_operation,
537            node.operation_name.as_deref(),
538            node.operation_in_mapping.as_deref(),
539        )?;
540        let call = ComponentCall {
541            component_ref: component_ref.to_string(),
542            operation,
543            input,
544            config,
545        };
546        self.invoke_component_call(ctx, node_id, state, call).await
547    }
548
549    async fn invoke_component_call(
550        &self,
551        ctx: &FlowContext<'_>,
552        node_id: &str,
553        state: &ExecutionState,
554        mut call: ComponentCall,
555    ) -> Result<NodeOutput> {
556        if let Value::Object(ref mut map) = call.input {
557            map.entry("state".to_string())
558                .or_insert_with(|| state.context());
559        }
560        let input_json = serde_json::to_string(&call.input)?;
561        let config_json = if call.config.is_null() {
562            None
563        } else {
564            Some(serde_json::to_string(&call.config)?)
565        };
566
567        let pack_idx = *self
568            .flow_sources
569            .get(ctx.flow_id)
570            .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
571        let pack = Arc::clone(&self.packs[pack_idx]);
572        let exec_ctx = component_exec_ctx(ctx, node_id);
573        let value = pack
574            .invoke_component(
575                call.component_ref.as_str(),
576                exec_ctx,
577                call.operation.as_str(),
578                config_json,
579                input_json,
580            )
581            .await?;
582
583        Ok(NodeOutput::new(value))
584    }
585
586    async fn execute_provider_invoke(
587        &self,
588        ctx: &FlowContext<'_>,
589        node_id: &str,
590        state: &ExecutionState,
591        payload: Value,
592    ) -> Result<NodeOutput> {
593        #[derive(Deserialize)]
594        struct ProviderPayload {
595            #[serde(default)]
596            provider_id: Option<String>,
597            #[serde(default)]
598            provider_type: Option<String>,
599            #[serde(default, alias = "operation")]
600            op: Option<String>,
601            #[serde(default)]
602            input: Value,
603            #[serde(default)]
604            in_map: Value,
605            #[serde(default)]
606            out_map: Value,
607            #[serde(default)]
608            err_map: Value,
609        }
610
611        let payload: ProviderPayload =
612            serde_json::from_value(payload).context("invalid payload for provider.invoke")?;
613        let op = payload
614            .op
615            .as_deref()
616            .filter(|v| !v.trim().is_empty())
617            .with_context(|| "provider.invoke requires an op")?
618            .to_string();
619
620        let mut input_value = if !payload.in_map.is_null() {
621            let ctx_value = mapping_ctx(payload.input.clone(), state);
622            apply_mapping(&payload.in_map, &ctx_value)
623                .context("failed to evaluate provider.invoke in_map")?
624        } else if !payload.input.is_null() {
625            payload.input
626        } else {
627            Value::Null
628        };
629
630        if let Value::Object(ref mut map) = input_value {
631            map.entry("state".to_string())
632                .or_insert_with(|| state.context());
633        }
634        let input_json = serde_json::to_vec(&input_value)?;
635
636        let pack_idx = *self
637            .flow_sources
638            .get(ctx.flow_id)
639            .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
640        let pack = Arc::clone(&self.packs[pack_idx]);
641        let binding = pack.resolve_provider(
642            payload.provider_id.as_deref(),
643            payload.provider_type.as_deref(),
644        )?;
645        let exec_ctx = component_exec_ctx(ctx, node_id);
646        let result = pack
647            .invoke_provider(&binding, exec_ctx, &op, input_json)
648            .await?;
649
650        let output = if payload.out_map.is_null() {
651            result
652        } else {
653            let ctx_value = mapping_ctx(result, state);
654            apply_mapping(&payload.out_map, &ctx_value)
655                .context("failed to evaluate provider.invoke out_map")?
656        };
657        let _ = payload.err_map;
658        Ok(NodeOutput::new(output))
659    }
660
661    pub fn flows(&self) -> &[FlowDescriptor] {
662        &self.flows
663    }
664
665    pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
666        self.flows
667            .iter()
668            .find(|descriptor| descriptor.flow_type == flow_type)
669    }
670
671    pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
672        self.flows
673            .iter()
674            .find(|descriptor| descriptor.id == flow_id)
675    }
676}
677
678pub trait ExecutionObserver: Send + Sync {
679    fn on_node_start(&self, event: &NodeEvent<'_>);
680    fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
681    fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
682}
683
684pub struct NodeEvent<'a> {
685    pub context: &'a FlowContext<'a>,
686    pub node_id: &'a str,
687    pub node: &'a HostNode,
688    pub payload: &'a Value,
689}
690
691#[derive(Clone, Debug, Serialize, Deserialize)]
692pub struct ExecutionState {
693    input: Value,
694    nodes: HashMap<String, NodeOutput>,
695    egress: Vec<Value>,
696}
697
698impl ExecutionState {
699    fn new(input: Value) -> Self {
700        Self {
701            input,
702            nodes: HashMap::new(),
703            egress: Vec::new(),
704        }
705    }
706
707    fn context(&self) -> Value {
708        let mut nodes = JsonMap::new();
709        for (id, output) in &self.nodes {
710            nodes.insert(
711                id.clone(),
712                json!({
713                    "ok": output.ok,
714                    "payload": output.payload.clone(),
715                    "meta": output.meta.clone(),
716                }),
717            );
718        }
719        json!({
720            "input": self.input.clone(),
721            "nodes": nodes,
722        })
723    }
724    fn push_egress(&mut self, payload: Value) {
725        self.egress.push(payload);
726    }
727
728    fn replace_input(&mut self, input: Value) {
729        self.input = input;
730    }
731
732    fn clear_egress(&mut self) {
733        self.egress.clear();
734    }
735
736    fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
737        if self.egress.is_empty() {
738            return final_payload.unwrap_or(Value::Null);
739        }
740        let mut emitted = std::mem::take(&mut self.egress);
741        if let Some(value) = final_payload {
742            match value {
743                Value::Null => {}
744                Value::Array(items) => emitted.extend(items),
745                other => emitted.push(other),
746            }
747        }
748        Value::Array(emitted)
749    }
750}
751
752#[derive(Clone, Debug, Serialize, Deserialize)]
753struct NodeOutput {
754    ok: bool,
755    payload: Value,
756    meta: Value,
757}
758
759impl NodeOutput {
760    fn new(payload: Value) -> Self {
761        Self {
762            ok: true,
763            payload,
764            meta: Value::Null,
765        }
766    }
767}
768
769struct DispatchOutcome {
770    output: NodeOutput,
771    wait_reason: Option<String>,
772}
773
774impl DispatchOutcome {
775    fn complete(output: NodeOutput) -> Self {
776        Self {
777            output,
778            wait_reason: None,
779        }
780    }
781
782    fn wait(output: NodeOutput, reason: Option<String>) -> Self {
783        Self {
784            output,
785            wait_reason: reason,
786        }
787    }
788}
789
790fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
791    ComponentExecCtx {
792        tenant: ComponentTenantCtx {
793            tenant: ctx.tenant.to_string(),
794            team: None,
795            user: ctx.provider_id.map(str::to_string),
796            trace_id: None,
797            correlation_id: ctx.session_id.map(str::to_string),
798            deadline_unix_ms: None,
799            attempt: 1,
800            idempotency_key: ctx.session_id.map(str::to_string),
801        },
802        flow_id: ctx.flow_id.to_string(),
803        node_id: Some(node_id.to_string()),
804    }
805}
806
807fn extract_wait_reason(payload: &Value) -> Option<String> {
808    match payload {
809        Value::String(s) => Some(s.clone()),
810        Value::Object(map) => map
811            .get("reason")
812            .and_then(Value::as_str)
813            .map(|value| value.to_string()),
814        _ => None,
815    }
816}
817
818fn mapping_ctx(root: Value, state: &ExecutionState) -> Value {
819    json!({
820        "input": root.clone(),
821        "result": root,
822        "state": state.context(),
823    })
824}
825
826fn apply_mapping(template: &Value, ctx: &Value) -> Result<Value> {
827    match template {
828        Value::String(path) if path.starts_with('/') => ctx
829            .pointer(path)
830            .cloned()
831            .ok_or_else(|| anyhow!("mapping path `{path}` not found")),
832        Value::Array(items) => {
833            let mut mapped = Vec::with_capacity(items.len());
834            for item in items {
835                mapped.push(apply_mapping(item, ctx)?);
836            }
837            Ok(Value::Array(mapped))
838        }
839        Value::Object(map) => {
840            let mut out = serde_json::Map::new();
841            for (key, value) in map {
842                out.insert(key.clone(), apply_mapping(value, ctx)?);
843            }
844            Ok(Value::Object(out))
845        }
846        other => Ok(other.clone()),
847    }
848}
849
850impl From<Flow> for HostFlow {
851    fn from(value: Flow) -> Self {
852        let mut nodes = IndexMap::new();
853        for (id, node) in value.nodes {
854            nodes.insert(id.clone(), HostNode::from(node));
855        }
856        let start = value
857            .entrypoints
858            .get("default")
859            .and_then(Value::as_str)
860            .and_then(|id| NodeId::from_str(id).ok())
861            .or_else(|| nodes.keys().next().cloned());
862        Self {
863            id: value.id.as_str().to_string(),
864            start,
865            nodes,
866        }
867    }
868}
869
870impl From<Node> for HostNode {
871    fn from(node: Node) -> Self {
872        let component_ref = node.component.id.as_str().to_string();
873        let raw_operation = node.component.operation.clone();
874        let operation_in_mapping = extract_operation_from_mapping(&node.input.mapping);
875        let operation_is_component_exec = raw_operation.as_deref() == Some("component.exec");
876        let operation_is_emit = raw_operation
877            .as_deref()
878            .map(|op| op.starts_with("emit."))
879            .unwrap_or(false);
880        let is_component_exec = component_ref == "component.exec" || operation_is_component_exec;
881
882        let kind = if is_component_exec {
883            let target = if component_ref == "component.exec" {
884                if let Some(op) = raw_operation
885                    .as_deref()
886                    .filter(|op| op.starts_with("emit."))
887                {
888                    op.to_string()
889                } else {
890                    extract_target_component(&node.input.mapping)
891                        .unwrap_or_else(|| "component.exec".to_string())
892                }
893            } else {
894                extract_target_component(&node.input.mapping)
895                    .unwrap_or_else(|| component_ref.clone())
896            };
897            if target.starts_with("emit.") {
898                NodeKind::BuiltinEmit {
899                    kind: emit_kind_from_ref(&target),
900                }
901            } else {
902                NodeKind::Exec {
903                    target_component: target,
904                }
905            }
906        } else if operation_is_emit {
907            NodeKind::BuiltinEmit {
908                kind: emit_kind_from_ref(raw_operation.as_deref().unwrap_or("emit.log")),
909            }
910        } else {
911            match component_ref.as_str() {
912                "flow.call" => NodeKind::FlowCall,
913                "provider.invoke" => NodeKind::ProviderInvoke,
914                "session.wait" => NodeKind::Wait,
915                comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
916                    kind: emit_kind_from_ref(comp),
917                },
918                other => NodeKind::PackComponent {
919                    component_ref: other.to_string(),
920                },
921            }
922        };
923        let component_label = match &kind {
924            NodeKind::Exec { .. } => "component.exec".to_string(),
925            NodeKind::PackComponent { component_ref } => component_ref.clone(),
926            NodeKind::ProviderInvoke => "provider.invoke".to_string(),
927            NodeKind::FlowCall => "flow.call".to_string(),
928            NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
929            NodeKind::Wait => "session.wait".to_string(),
930        };
931        let operation_name = if is_component_exec && operation_is_component_exec {
932            None
933        } else {
934            raw_operation.clone()
935        };
936        let payload_expr = match kind {
937            NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.input.mapping),
938            _ => node.input.mapping.clone(),
939        };
940        Self {
941            kind,
942            component: component_label,
943            component_id: if is_component_exec {
944                "component.exec".to_string()
945            } else {
946                component_ref
947            },
948            operation_name,
949            operation_in_mapping,
950            payload_expr,
951            routing: node.routing,
952        }
953    }
954}
955
956fn extract_target_component(payload: &Value) -> Option<String> {
957    match payload {
958        Value::Object(map) => map
959            .get("component")
960            .or_else(|| map.get("component_ref"))
961            .and_then(Value::as_str)
962            .map(|s| s.to_string()),
963        _ => None,
964    }
965}
966
967fn extract_operation_from_mapping(payload: &Value) -> Option<String> {
968    match payload {
969        Value::Object(map) => map
970            .get("operation")
971            .or_else(|| map.get("op"))
972            .and_then(Value::as_str)
973            .map(str::trim)
974            .filter(|value| !value.is_empty())
975            .map(|value| value.to_string()),
976        _ => None,
977    }
978}
979
980fn extract_emit_payload(payload: &Value) -> Value {
981    if let Value::Object(map) = payload {
982        if let Some(input) = map.get("input") {
983            return input.clone();
984        }
985        if let Some(inner) = map.get("payload") {
986            return inner.clone();
987        }
988    }
989    payload.clone()
990}
991
992fn split_operation_payload(payload: Value) -> (Value, Value) {
993    if let Value::Object(mut map) = payload.clone()
994        && map.contains_key("input")
995    {
996        let input = map.remove("input").unwrap_or(Value::Null);
997        let config = map.remove("config").unwrap_or(Value::Null);
998        let legacy_only = map.keys().all(|key| {
999            matches!(
1000                key.as_str(),
1001                "operation" | "op" | "component" | "component_ref"
1002            )
1003        });
1004        if legacy_only {
1005            return (input, config);
1006        }
1007    }
1008    (payload, Value::Null)
1009}
1010
1011fn resolve_component_operation(
1012    node_id: &str,
1013    component_label: &str,
1014    payload_operation: Option<String>,
1015    operation_override: Option<&str>,
1016    operation_in_mapping: Option<&str>,
1017) -> Result<String> {
1018    if let Some(op) = operation_override
1019        .map(str::trim)
1020        .filter(|value| !value.is_empty())
1021    {
1022        return Ok(op.to_string());
1023    }
1024
1025    if let Some(op) = payload_operation
1026        .as_deref()
1027        .map(str::trim)
1028        .filter(|value| !value.is_empty())
1029    {
1030        return Ok(op.to_string());
1031    }
1032
1033    let mut message = format!(
1034        "missing operation for node `{}` (component `{}`); expected node.component.operation to be set",
1035        node_id, component_label,
1036    );
1037    if let Some(found) = operation_in_mapping {
1038        message.push_str(&format!(
1039            ". Found operation in input.mapping (`{}`) but this is not used; pack compiler must preserve node.component.operation.",
1040            found
1041        ));
1042    }
1043    bail!(message);
1044}
1045
1046fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
1047    match component_ref {
1048        "emit.log" => EmitKind::Log,
1049        "emit.response" => EmitKind::Response,
1050        other => EmitKind::Other(other.to_string()),
1051    }
1052}
1053
1054fn emit_ref_from_kind(kind: &EmitKind) -> String {
1055    match kind {
1056        EmitKind::Log => "emit.log".to_string(),
1057        EmitKind::Response => "emit.response".to_string(),
1058        EmitKind::Other(other) => other.clone(),
1059    }
1060}
1061
1062#[cfg(test)]
1063mod tests {
1064    use super::*;
1065    use greentic_types::{
1066        Flow, FlowComponentRef, FlowId, FlowKind, InputMapping, Node, NodeId, OutputMapping,
1067        Routing, TelemetryHints,
1068    };
1069    use serde_json::json;
1070    use std::collections::BTreeMap;
1071    use std::str::FromStr;
1072    use std::sync::Mutex;
1073    use tokio::runtime::Runtime;
1074
1075    fn minimal_engine() -> FlowEngine {
1076        FlowEngine {
1077            packs: Vec::new(),
1078            flows: Vec::new(),
1079            flow_sources: HashMap::new(),
1080            flow_cache: RwLock::new(HashMap::new()),
1081            default_env: "local".to_string(),
1082        }
1083    }
1084
1085    #[test]
1086    fn templating_renders_with_partials_and_data() {
1087        let mut state = ExecutionState::new(json!({ "city": "London" }));
1088        state.nodes.insert(
1089            "forecast".to_string(),
1090            NodeOutput::new(json!({ "temp": "20C" })),
1091        );
1092
1093        // templating handled via component now; ensure context still includes node outputs
1094        let ctx = state.context();
1095        assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
1096    }
1097
1098    #[test]
1099    fn finalize_wraps_emitted_payloads() {
1100        let mut state = ExecutionState::new(json!({}));
1101        state.push_egress(json!({ "text": "first" }));
1102        state.push_egress(json!({ "text": "second" }));
1103        let result = state.finalize_with(Some(json!({ "text": "final" })));
1104        assert_eq!(
1105            result,
1106            json!([
1107                { "text": "first" },
1108                { "text": "second" },
1109                { "text": "final" }
1110            ])
1111        );
1112    }
1113
1114    #[test]
1115    fn finalize_flattens_final_array() {
1116        let mut state = ExecutionState::new(json!({}));
1117        state.push_egress(json!({ "text": "only" }));
1118        let result = state.finalize_with(Some(json!([
1119            { "text": "extra-1" },
1120            { "text": "extra-2" }
1121        ])));
1122        assert_eq!(
1123            result,
1124            json!([
1125                { "text": "only" },
1126                { "text": "extra-1" },
1127                { "text": "extra-2" }
1128            ])
1129        );
1130    }
1131
1132    #[test]
1133    fn missing_operation_reports_node_and_component() {
1134        let engine = minimal_engine();
1135        let rt = Runtime::new().unwrap();
1136        let retry_config = RetryConfig {
1137            max_attempts: 1,
1138            base_delay_ms: 1,
1139        };
1140        let ctx = FlowContext {
1141            tenant: "tenant",
1142            flow_id: "flow",
1143            node_id: Some("missing-op"),
1144            tool: None,
1145            action: None,
1146            session_id: None,
1147            provider_id: None,
1148            retry_config,
1149            observer: None,
1150            mocks: None,
1151        };
1152        let node = HostNode {
1153            kind: NodeKind::Exec {
1154                target_component: "qa.process".into(),
1155            },
1156            component: "component.exec".into(),
1157            component_id: "component.exec".into(),
1158            operation_name: None,
1159            operation_in_mapping: None,
1160            payload_expr: Value::Null,
1161            routing: Routing::End,
1162        };
1163        let state = ExecutionState::new(Value::Null);
1164        let payload = json!({ "component": "qa.process" });
1165        let err = rt
1166            .block_on(engine.execute_component_exec(
1167                &ctx,
1168                "missing-op",
1169                &node,
1170                &state,
1171                payload,
1172                ComponentOverrides {
1173                    component: None,
1174                    operation: None,
1175                },
1176            ))
1177            .unwrap_err();
1178        let message = err.to_string();
1179        assert!(
1180            message.contains("missing operation for node `missing-op`"),
1181            "unexpected message: {message}"
1182        );
1183        assert!(
1184            message.contains("(component `component.exec`)"),
1185            "unexpected message: {message}"
1186        );
1187    }
1188
1189    #[test]
1190    fn missing_operation_mentions_mapping_hint() {
1191        let engine = minimal_engine();
1192        let rt = Runtime::new().unwrap();
1193        let retry_config = RetryConfig {
1194            max_attempts: 1,
1195            base_delay_ms: 1,
1196        };
1197        let ctx = FlowContext {
1198            tenant: "tenant",
1199            flow_id: "flow",
1200            node_id: Some("missing-op-hint"),
1201            tool: None,
1202            action: None,
1203            session_id: None,
1204            provider_id: None,
1205            retry_config,
1206            observer: None,
1207            mocks: None,
1208        };
1209        let node = HostNode {
1210            kind: NodeKind::Exec {
1211                target_component: "qa.process".into(),
1212            },
1213            component: "component.exec".into(),
1214            component_id: "component.exec".into(),
1215            operation_name: None,
1216            operation_in_mapping: Some("render".into()),
1217            payload_expr: Value::Null,
1218            routing: Routing::End,
1219        };
1220        let state = ExecutionState::new(Value::Null);
1221        let payload = json!({ "component": "qa.process" });
1222        let err = rt
1223            .block_on(engine.execute_component_exec(
1224                &ctx,
1225                "missing-op-hint",
1226                &node,
1227                &state,
1228                payload,
1229                ComponentOverrides {
1230                    component: None,
1231                    operation: None,
1232                },
1233            ))
1234            .unwrap_err();
1235        let message = err.to_string();
1236        assert!(
1237            message.contains("missing operation for node `missing-op-hint`"),
1238            "unexpected message: {message}"
1239        );
1240        assert!(
1241            message.contains("Found operation in input.mapping (`render`)"),
1242            "unexpected message: {message}"
1243        );
1244    }
1245
1246    struct CountingObserver {
1247        starts: Mutex<Vec<String>>,
1248        ends: Mutex<Vec<Value>>,
1249    }
1250
1251    impl CountingObserver {
1252        fn new() -> Self {
1253            Self {
1254                starts: Mutex::new(Vec::new()),
1255                ends: Mutex::new(Vec::new()),
1256            }
1257        }
1258    }
1259
1260    impl ExecutionObserver for CountingObserver {
1261        fn on_node_start(&self, event: &NodeEvent<'_>) {
1262            self.starts.lock().unwrap().push(event.node_id.to_string());
1263        }
1264
1265        fn on_node_end(&self, _event: &NodeEvent<'_>, output: &Value) {
1266            self.ends.lock().unwrap().push(output.clone());
1267        }
1268
1269        fn on_node_error(&self, _event: &NodeEvent<'_>, _error: &dyn StdError) {}
1270    }
1271
1272    #[test]
1273    fn emits_end_event_for_successful_node() {
1274        let node_id = NodeId::from_str("emit").unwrap();
1275        let node = Node {
1276            id: node_id.clone(),
1277            component: FlowComponentRef {
1278                id: "emit.log".parse().unwrap(),
1279                pack_alias: None,
1280                operation: None,
1281            },
1282            input: InputMapping {
1283                mapping: json!({ "message": "logged" }),
1284            },
1285            output: OutputMapping {
1286                mapping: Value::Null,
1287            },
1288            routing: Routing::End,
1289            telemetry: TelemetryHints::default(),
1290        };
1291        let mut nodes = indexmap::IndexMap::default();
1292        nodes.insert(node_id.clone(), node);
1293        let flow = Flow {
1294            schema_version: "1.0".into(),
1295            id: FlowId::from_str("emit.flow").unwrap(),
1296            kind: FlowKind::Messaging,
1297            entrypoints: BTreeMap::from([(
1298                "default".to_string(),
1299                Value::String(node_id.to_string()),
1300            )]),
1301            nodes,
1302            metadata: Default::default(),
1303        };
1304        let host_flow = HostFlow::from(flow);
1305
1306        let engine = FlowEngine {
1307            packs: Vec::new(),
1308            flows: Vec::new(),
1309            flow_sources: HashMap::new(),
1310            flow_cache: RwLock::new(HashMap::from([("emit.flow".to_string(), host_flow)])),
1311            default_env: "local".to_string(),
1312        };
1313        let observer = CountingObserver::new();
1314        let ctx = FlowContext {
1315            tenant: "demo",
1316            flow_id: "emit.flow",
1317            node_id: None,
1318            tool: None,
1319            action: None,
1320            session_id: None,
1321            provider_id: None,
1322            retry_config: RetryConfig {
1323                max_attempts: 1,
1324                base_delay_ms: 1,
1325            },
1326            observer: Some(&observer),
1327            mocks: None,
1328        };
1329
1330        let rt = Runtime::new().unwrap();
1331        let result = rt.block_on(engine.execute(ctx, Value::Null)).unwrap();
1332        assert!(matches!(result.status, FlowStatus::Completed));
1333
1334        let starts = observer.starts.lock().unwrap();
1335        let ends = observer.ends.lock().unwrap();
1336        assert_eq!(starts.len(), 1);
1337        assert_eq!(ends.len(), 1);
1338        assert_eq!(ends[0], json!({ "message": "logged" }));
1339    }
1340}
1341
1342use tracing::Instrument;
1343
1344pub struct FlowContext<'a> {
1345    pub tenant: &'a str,
1346    pub flow_id: &'a str,
1347    pub node_id: Option<&'a str>,
1348    pub tool: Option<&'a str>,
1349    pub action: Option<&'a str>,
1350    pub session_id: Option<&'a str>,
1351    pub provider_id: Option<&'a str>,
1352    pub retry_config: RetryConfig,
1353    pub observer: Option<&'a dyn ExecutionObserver>,
1354    pub mocks: Option<&'a MockLayer>,
1355}
1356
1357#[derive(Copy, Clone)]
1358pub struct RetryConfig {
1359    pub max_attempts: u32,
1360    pub base_delay_ms: u64,
1361}
1362
1363fn should_retry(err: &anyhow::Error) -> bool {
1364    let lower = err.to_string().to_lowercase();
1365    lower.contains("transient") || lower.contains("unavailable") || lower.contains("internal")
1366}
1367
1368impl From<FlowRetryConfig> for RetryConfig {
1369    fn from(value: FlowRetryConfig) -> Self {
1370        Self {
1371            max_attempts: value.max_attempts.max(1),
1372            base_delay_ms: value.base_delay_ms.max(50),
1373        }
1374    }
1375}