greentic_runner_host/runner/
engine.rs

1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
9use anyhow::{Context, Result, anyhow, bail};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use super::templating::{TemplateOptions, render_template_value};
18use crate::config::{FlowRetryConfig, HostConfig};
19use crate::pack::{FlowDescriptor, PackRuntime};
20use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
21use greentic_types::{Flow, Node, NodeId, Routing};
22
23pub struct FlowEngine {
24    packs: Vec<Arc<PackRuntime>>,
25    flows: Vec<FlowDescriptor>,
26    flow_sources: HashMap<FlowKey, usize>,
27    flow_cache: RwLock<HashMap<FlowKey, HostFlow>>,
28    default_env: String,
29}
30
31#[derive(Clone, Debug, PartialEq, Eq, Hash)]
32struct FlowKey {
33    pack_id: String,
34    flow_id: String,
35}
36
37#[derive(Clone, Debug, Serialize, Deserialize)]
38pub struct FlowSnapshot {
39    pub pack_id: String,
40    pub flow_id: String,
41    pub next_node: String,
42    pub state: ExecutionState,
43}
44
45#[derive(Clone, Debug)]
46pub struct FlowWait {
47    pub reason: Option<String>,
48    pub snapshot: FlowSnapshot,
49}
50
51#[derive(Clone, Debug)]
52pub enum FlowStatus {
53    Completed,
54    Waiting(Box<FlowWait>),
55}
56
57#[derive(Clone, Debug)]
58pub struct FlowExecution {
59    pub output: Value,
60    pub status: FlowStatus,
61}
62
63#[derive(Clone, Debug)]
64struct HostFlow {
65    id: String,
66    start: Option<NodeId>,
67    nodes: IndexMap<NodeId, HostNode>,
68}
69
70#[derive(Clone, Debug)]
71pub struct HostNode {
72    kind: NodeKind,
73    /// Backwards-compatible component label for observers/transcript.
74    pub component: String,
75    component_id: String,
76    operation_name: Option<String>,
77    operation_in_mapping: Option<String>,
78    payload_expr: Value,
79    routing: Routing,
80}
81
82#[derive(Clone, Debug)]
83enum NodeKind {
84    Exec { target_component: String },
85    PackComponent { component_ref: String },
86    ProviderInvoke,
87    FlowCall,
88    BuiltinEmit { kind: EmitKind },
89    Wait,
90}
91
92#[derive(Clone, Debug)]
93enum EmitKind {
94    Log,
95    Response,
96    Other(String),
97}
98
99struct ComponentOverrides<'a> {
100    component: Option<&'a str>,
101    operation: Option<&'a str>,
102}
103
104struct ComponentCall {
105    component_ref: String,
106    operation: String,
107    input: Value,
108    config: Value,
109}
110
111impl FlowExecution {
112    fn completed(output: Value) -> Self {
113        Self {
114            output,
115            status: FlowStatus::Completed,
116        }
117    }
118
119    fn waiting(output: Value, wait: FlowWait) -> Self {
120        Self {
121            output,
122            status: FlowStatus::Waiting(Box::new(wait)),
123        }
124    }
125}
126
127impl FlowEngine {
128    pub async fn new(packs: Vec<Arc<PackRuntime>>, config: Arc<HostConfig>) -> Result<Self> {
129        let mut flow_sources: HashMap<FlowKey, usize> = HashMap::new();
130        let mut descriptors = Vec::new();
131        let mut bindings = HashMap::new();
132        for pack in &config.pack_bindings {
133            bindings.insert(pack.pack_id.clone(), pack.flows.clone());
134        }
135        let enforce_bindings = !bindings.is_empty();
136        for (idx, pack) in packs.iter().enumerate() {
137            let pack_id = pack.metadata().pack_id.clone();
138            if enforce_bindings && !bindings.contains_key(&pack_id) {
139                bail!("no gtbind entries found for pack {}", pack_id);
140            }
141            let flows = pack.list_flows().await?;
142            let allowed = bindings.get(&pack_id).map(|flows| {
143                flows
144                    .iter()
145                    .cloned()
146                    .collect::<std::collections::HashSet<_>>()
147            });
148            let mut seen = std::collections::HashSet::new();
149            for flow in flows {
150                if let Some(ref allow) = allowed
151                    && !allow.contains(&flow.id)
152                {
153                    continue;
154                }
155                seen.insert(flow.id.clone());
156                tracing::info!(
157                    flow_id = %flow.id,
158                    flow_type = %flow.flow_type,
159                    pack_id = %flow.pack_id,
160                    pack_index = idx,
161                    "registered flow"
162                );
163                flow_sources.insert(
164                    FlowKey {
165                        pack_id: flow.pack_id.clone(),
166                        flow_id: flow.id.clone(),
167                    },
168                    idx,
169                );
170                descriptors.retain(|existing: &FlowDescriptor| {
171                    !(existing.id == flow.id && existing.pack_id == flow.pack_id)
172                });
173                descriptors.push(flow);
174            }
175            if let Some(allow) = allowed {
176                let missing = allow.difference(&seen).cloned().collect::<Vec<_>>();
177                if !missing.is_empty() {
178                    bail!(
179                        "gtbind flow ids missing in pack {}: {}",
180                        pack_id,
181                        missing.join(", ")
182                    );
183                }
184            }
185        }
186
187        let mut flow_map = HashMap::new();
188        for flow in &descriptors {
189            let pack_id = flow.pack_id.clone();
190            if let Some(&pack_idx) = flow_sources.get(&FlowKey {
191                pack_id: pack_id.clone(),
192                flow_id: flow.id.clone(),
193            }) {
194                let pack_clone = Arc::clone(&packs[pack_idx]);
195                let flow_id = flow.id.clone();
196                let task_flow_id = flow_id.clone();
197                match task::spawn_blocking(move || pack_clone.load_flow(&task_flow_id)).await {
198                    Ok(Ok(loaded_flow)) => {
199                        flow_map.insert(
200                            FlowKey {
201                                pack_id: pack_id.clone(),
202                                flow_id,
203                            },
204                            HostFlow::from(loaded_flow),
205                        );
206                    }
207                    Ok(Err(err)) => {
208                        tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
209                    }
210                    Err(err) => {
211                        tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
212                    }
213                }
214            }
215        }
216
217        Ok(Self {
218            packs,
219            flows: descriptors,
220            flow_sources,
221            flow_cache: RwLock::new(flow_map),
222            default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
223        })
224    }
225
226    async fn get_or_load_flow(&self, pack_id: &str, flow_id: &str) -> Result<HostFlow> {
227        let key = FlowKey {
228            pack_id: pack_id.to_string(),
229            flow_id: flow_id.to_string(),
230        };
231        if let Some(flow) = self.flow_cache.read().get(&key).cloned() {
232            return Ok(flow);
233        }
234
235        let pack_idx = *self
236            .flow_sources
237            .get(&key)
238            .with_context(|| format!("flow {pack_id}:{flow_id} not registered"))?;
239        let pack = Arc::clone(&self.packs[pack_idx]);
240        let flow_id_owned = flow_id.to_string();
241        let task_flow_id = flow_id_owned.clone();
242        let flow = task::spawn_blocking(move || pack.load_flow(&task_flow_id))
243            .await
244            .context("failed to join flow metadata task")??;
245        let host_flow = HostFlow::from(flow);
246        self.flow_cache.write().insert(
247            FlowKey {
248                pack_id: pack_id.to_string(),
249                flow_id: flow_id_owned.clone(),
250            },
251            host_flow.clone(),
252        );
253        Ok(host_flow)
254    }
255
256    pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
257        let span = tracing::info_span!(
258            "flow.execute",
259            tenant = tracing::field::Empty,
260            flow_id = tracing::field::Empty,
261            node_id = tracing::field::Empty,
262            tool = tracing::field::Empty,
263            action = tracing::field::Empty
264        );
265        annotate_span(
266            &span,
267            &FlowSpanAttributes {
268                tenant: ctx.tenant,
269                flow_id: ctx.flow_id,
270                node_id: ctx.node_id,
271                tool: ctx.tool,
272                action: ctx.action,
273            },
274        );
275        set_flow_context(
276            &self.default_env,
277            ctx.tenant,
278            ctx.flow_id,
279            ctx.node_id,
280            ctx.provider_id,
281            ctx.session_id,
282        );
283        let retry_config = ctx.retry_config;
284        let original_input = input;
285        async move {
286            let mut attempt = 0u32;
287            loop {
288                attempt += 1;
289                match self.execute_once(&ctx, original_input.clone()).await {
290                    Ok(value) => return Ok(value),
291                    Err(err) => {
292                        if attempt >= retry_config.max_attempts || !should_retry(&err) {
293                            return Err(err);
294                        }
295                        let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
296                        tracing::warn!(
297                            tenant = ctx.tenant,
298                            flow_id = ctx.flow_id,
299                            attempt,
300                            max_attempts = retry_config.max_attempts,
301                            delay_ms = delay,
302                            error = %err,
303                            "transient flow execution failure, backing off"
304                        );
305                        tokio::time::sleep(Duration::from_millis(delay)).await;
306                    }
307                }
308            }
309        }
310        .instrument(span)
311        .await
312    }
313
314    pub async fn resume(
315        &self,
316        ctx: FlowContext<'_>,
317        snapshot: FlowSnapshot,
318        input: Value,
319    ) -> Result<FlowExecution> {
320        if snapshot.pack_id != ctx.pack_id {
321            bail!(
322                "snapshot pack {} does not match requested {}",
323                snapshot.pack_id,
324                ctx.pack_id
325            );
326        }
327        if snapshot.flow_id != ctx.flow_id {
328            bail!(
329                "snapshot flow {} does not match requested {}",
330                snapshot.flow_id,
331                ctx.flow_id
332            );
333        }
334        let flow_ir = self.get_or_load_flow(ctx.pack_id, ctx.flow_id).await?;
335        let mut state = snapshot.state;
336        state.replace_input(input);
337        state.ensure_entry();
338        self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
339            .await
340    }
341
342    async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
343        let flow_ir = self.get_or_load_flow(ctx.pack_id, ctx.flow_id).await?;
344        let state = ExecutionState::new(input);
345        self.drive_flow(ctx, flow_ir, state, None).await
346    }
347
348    async fn drive_flow(
349        &self,
350        ctx: &FlowContext<'_>,
351        flow_ir: HostFlow,
352        mut state: ExecutionState,
353        resume_from: Option<String>,
354    ) -> Result<FlowExecution> {
355        let mut current = match resume_from {
356            Some(node) => NodeId::from_str(&node)
357                .with_context(|| format!("invalid resume node id `{node}`"))?,
358            None => flow_ir
359                .start
360                .clone()
361                .or_else(|| flow_ir.nodes.keys().next().cloned())
362                .with_context(|| format!("flow {} has no start node", flow_ir.id))?,
363        };
364
365        loop {
366            let node = flow_ir
367                .nodes
368                .get(&current)
369                .with_context(|| format!("node {} not found", current.as_str()))?;
370
371            let payload_template = node.payload_expr.clone();
372            let prev = state
373                .last_output
374                .as_ref()
375                .cloned()
376                .unwrap_or_else(|| Value::Object(JsonMap::new()));
377            let ctx_value = template_context(&state, prev);
378            let payload =
379                render_template_value(&payload_template, &ctx_value, TemplateOptions::default())
380                    .context("failed to render node input template")?;
381            let observed_payload = payload.clone();
382            let node_id = current.clone();
383            let event = NodeEvent {
384                context: ctx,
385                node_id: node_id.as_str(),
386                node,
387                payload: &observed_payload,
388            };
389            if let Some(observer) = ctx.observer {
390                observer.on_node_start(&event);
391            }
392            let DispatchOutcome {
393                output,
394                wait_reason,
395            } = self
396                .dispatch_node(ctx, node_id.as_str(), node, &mut state, payload)
397                .await?;
398
399            state.nodes.insert(node_id.clone().into(), output.clone());
400            state.last_output = Some(output.payload.clone());
401            if let Some(observer) = ctx.observer {
402                observer.on_node_end(&event, &output.payload);
403            }
404
405            let (next, should_exit) = match &node.routing {
406                Routing::Next { node_id } => (Some(node_id.clone()), false),
407                Routing::End | Routing::Reply => (None, true),
408                Routing::Branch { default, .. } => (default.clone(), default.is_none()),
409                Routing::Custom(raw) => {
410                    tracing::warn!(
411                        flow_id = %flow_ir.id,
412                        node_id = %node_id,
413                        routing = ?raw,
414                        "unsupported routing; terminating flow"
415                    );
416                    (None, true)
417                }
418            };
419
420            if let Some(wait_reason) = wait_reason {
421                let resume_target = next.clone().ok_or_else(|| {
422                    anyhow!(
423                        "session.wait node {} requires a non-empty route",
424                        current.as_str()
425                    )
426                })?;
427                let mut snapshot_state = state.clone();
428                snapshot_state.clear_egress();
429                let snapshot = FlowSnapshot {
430                    pack_id: ctx.pack_id.to_string(),
431                    flow_id: ctx.flow_id.to_string(),
432                    next_node: resume_target.as_str().to_string(),
433                    state: snapshot_state,
434                };
435                let output_value = state.clone().finalize_with(None);
436                return Ok(FlowExecution::waiting(
437                    output_value,
438                    FlowWait {
439                        reason: Some(wait_reason),
440                        snapshot,
441                    },
442                ));
443            }
444
445            if should_exit {
446                return Ok(FlowExecution::completed(
447                    state.finalize_with(Some(output.payload.clone())),
448                ));
449            }
450
451            match next {
452                Some(n) => current = n,
453                None => {
454                    return Ok(FlowExecution::completed(
455                        state.finalize_with(Some(output.payload.clone())),
456                    ));
457                }
458            }
459        }
460    }
461
462    async fn dispatch_node(
463        &self,
464        ctx: &FlowContext<'_>,
465        node_id: &str,
466        node: &HostNode,
467        state: &mut ExecutionState,
468        payload: Value,
469    ) -> Result<DispatchOutcome> {
470        match &node.kind {
471            NodeKind::Exec { target_component } => self
472                .execute_component_exec(
473                    ctx,
474                    node_id,
475                    node,
476                    payload,
477                    ComponentOverrides {
478                        component: Some(target_component.as_str()),
479                        operation: node.operation_name.as_deref(),
480                    },
481                )
482                .await
483                .map(DispatchOutcome::complete),
484            NodeKind::PackComponent { component_ref } => self
485                .execute_component_call(ctx, node_id, node, payload, component_ref.as_str())
486                .await
487                .map(DispatchOutcome::complete),
488            NodeKind::FlowCall => self
489                .execute_flow_call(ctx, payload)
490                .await
491                .map(DispatchOutcome::complete),
492            NodeKind::ProviderInvoke => self
493                .execute_provider_invoke(ctx, node_id, state, payload)
494                .await
495                .map(DispatchOutcome::complete),
496            NodeKind::BuiltinEmit { kind } => {
497                match kind {
498                    EmitKind::Log | EmitKind::Response => {}
499                    EmitKind::Other(component) => {
500                        tracing::debug!(%component, "handling emit.* as builtin");
501                    }
502                }
503                state.push_egress(payload.clone());
504                Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
505            }
506            NodeKind::Wait => {
507                let reason = extract_wait_reason(&payload);
508                Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
509            }
510        }
511    }
512
513    async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
514        #[derive(Deserialize)]
515        struct FlowCallPayload {
516            #[serde(alias = "flow")]
517            flow_id: String,
518            #[serde(default)]
519            input: Value,
520        }
521
522        let call: FlowCallPayload =
523            serde_json::from_value(payload).context("invalid payload for flow.call node")?;
524        if call.flow_id.trim().is_empty() {
525            bail!("flow.call requires a non-empty flow_id");
526        }
527
528        let sub_input = if call.input.is_null() {
529            Value::Null
530        } else {
531            call.input
532        };
533
534        let flow_id_owned = call.flow_id;
535        let action = "flow.call";
536        let sub_ctx = FlowContext {
537            tenant: ctx.tenant,
538            pack_id: ctx.pack_id,
539            flow_id: flow_id_owned.as_str(),
540            node_id: None,
541            tool: ctx.tool,
542            action: Some(action),
543            session_id: ctx.session_id,
544            provider_id: ctx.provider_id,
545            retry_config: ctx.retry_config,
546            observer: ctx.observer,
547            mocks: ctx.mocks,
548        };
549
550        let execution = Box::pin(self.execute(sub_ctx, sub_input))
551            .await
552            .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
553        match execution.status {
554            FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
555            FlowStatus::Waiting(wait) => bail!(
556                "flow.call cannot pause (flow {} waiting {:?})",
557                flow_id_owned,
558                wait.reason
559            ),
560        }
561    }
562
563    async fn execute_component_exec(
564        &self,
565        ctx: &FlowContext<'_>,
566        node_id: &str,
567        node: &HostNode,
568        payload: Value,
569        overrides: ComponentOverrides<'_>,
570    ) -> Result<NodeOutput> {
571        #[derive(Deserialize)]
572        struct ComponentPayload {
573            #[serde(default, alias = "component_ref", alias = "component")]
574            component: Option<String>,
575            #[serde(alias = "op")]
576            operation: Option<String>,
577            #[serde(default)]
578            input: Value,
579            #[serde(default)]
580            config: Value,
581        }
582
583        let payload: ComponentPayload =
584            serde_json::from_value(payload).context("invalid payload for component.exec")?;
585        let component_ref = overrides
586            .component
587            .map(str::to_string)
588            .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
589            .with_context(|| "component.exec requires a component_ref")?;
590        let operation = resolve_component_operation(
591            node_id,
592            node.component_id.as_str(),
593            payload.operation,
594            overrides.operation,
595            node.operation_in_mapping.as_deref(),
596        )?;
597        let call = ComponentCall {
598            component_ref,
599            operation,
600            input: payload.input,
601            config: payload.config,
602        };
603
604        self.invoke_component_call(ctx, node_id, call).await
605    }
606
607    async fn execute_component_call(
608        &self,
609        ctx: &FlowContext<'_>,
610        node_id: &str,
611        node: &HostNode,
612        payload: Value,
613        component_ref: &str,
614    ) -> Result<NodeOutput> {
615        let payload_operation = extract_operation_from_mapping(&payload);
616        let (input, config) = split_operation_payload(payload);
617        let operation = resolve_component_operation(
618            node_id,
619            node.component_id.as_str(),
620            payload_operation,
621            node.operation_name.as_deref(),
622            node.operation_in_mapping.as_deref(),
623        )?;
624        let call = ComponentCall {
625            component_ref: component_ref.to_string(),
626            operation,
627            input,
628            config,
629        };
630        self.invoke_component_call(ctx, node_id, call).await
631    }
632
633    async fn invoke_component_call(
634        &self,
635        ctx: &FlowContext<'_>,
636        node_id: &str,
637        call: ComponentCall,
638    ) -> Result<NodeOutput> {
639        let input_json = serde_json::to_string(&call.input)?;
640        let config_json = if call.config.is_null() {
641            None
642        } else {
643            Some(serde_json::to_string(&call.config)?)
644        };
645
646        let key = FlowKey {
647            pack_id: ctx.pack_id.to_string(),
648            flow_id: ctx.flow_id.to_string(),
649        };
650        let pack_idx = *self.flow_sources.get(&key).with_context(|| {
651            format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
652        })?;
653        let pack = Arc::clone(&self.packs[pack_idx]);
654        let exec_ctx = component_exec_ctx(ctx, node_id);
655        let value = pack
656            .invoke_component(
657                call.component_ref.as_str(),
658                exec_ctx,
659                call.operation.as_str(),
660                config_json,
661                input_json,
662            )
663            .await?;
664
665        Ok(NodeOutput::new(value))
666    }
667
668    async fn execute_provider_invoke(
669        &self,
670        ctx: &FlowContext<'_>,
671        node_id: &str,
672        state: &ExecutionState,
673        payload: Value,
674    ) -> Result<NodeOutput> {
675        #[derive(Deserialize)]
676        struct ProviderPayload {
677            #[serde(default)]
678            provider_id: Option<String>,
679            #[serde(default)]
680            provider_type: Option<String>,
681            #[serde(default, alias = "operation")]
682            op: Option<String>,
683            #[serde(default)]
684            input: Value,
685            #[serde(default)]
686            in_map: Value,
687            #[serde(default)]
688            out_map: Value,
689            #[serde(default)]
690            err_map: Value,
691        }
692
693        let payload: ProviderPayload =
694            serde_json::from_value(payload).context("invalid payload for provider.invoke")?;
695        let op = payload
696            .op
697            .as_deref()
698            .filter(|v| !v.trim().is_empty())
699            .with_context(|| "provider.invoke requires an op")?
700            .to_string();
701
702        let prev = state
703            .last_output
704            .as_ref()
705            .cloned()
706            .unwrap_or_else(|| Value::Object(JsonMap::new()));
707        let base_ctx = template_context(state, prev);
708
709        let input_value = if !payload.in_map.is_null() {
710            let mut ctx_value = base_ctx.clone();
711            if let Value::Object(ref mut map) = ctx_value {
712                map.insert("input".into(), payload.input.clone());
713                map.insert("result".into(), payload.input.clone());
714            }
715            render_template_value(
716                &payload.in_map,
717                &ctx_value,
718                TemplateOptions {
719                    allow_pointer: true,
720                },
721            )
722            .context("failed to render provider.invoke in_map")?
723        } else if !payload.input.is_null() {
724            payload.input
725        } else {
726            Value::Null
727        };
728        let input_json = serde_json::to_vec(&input_value)?;
729
730        let key = FlowKey {
731            pack_id: ctx.pack_id.to_string(),
732            flow_id: ctx.flow_id.to_string(),
733        };
734        let pack_idx = *self.flow_sources.get(&key).with_context(|| {
735            format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
736        })?;
737        let pack = Arc::clone(&self.packs[pack_idx]);
738        let binding = pack.resolve_provider(
739            payload.provider_id.as_deref(),
740            payload.provider_type.as_deref(),
741        )?;
742        let exec_ctx = component_exec_ctx(ctx, node_id);
743        let result = pack
744            .invoke_provider(&binding, exec_ctx, &op, input_json)
745            .await?;
746
747        let output = if payload.out_map.is_null() {
748            result
749        } else {
750            let mut ctx_value = base_ctx;
751            if let Value::Object(ref mut map) = ctx_value {
752                map.insert("input".into(), result.clone());
753                map.insert("result".into(), result.clone());
754            }
755            render_template_value(
756                &payload.out_map,
757                &ctx_value,
758                TemplateOptions {
759                    allow_pointer: true,
760                },
761            )
762            .context("failed to render provider.invoke out_map")?
763        };
764        let _ = payload.err_map;
765        Ok(NodeOutput::new(output))
766    }
767
768    pub fn flows(&self) -> &[FlowDescriptor] {
769        &self.flows
770    }
771
772    pub fn flow_by_key(&self, pack_id: &str, flow_id: &str) -> Option<&FlowDescriptor> {
773        self.flows
774            .iter()
775            .find(|descriptor| descriptor.pack_id == pack_id && descriptor.id == flow_id)
776    }
777
778    pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
779        let mut matches = self
780            .flows
781            .iter()
782            .filter(|descriptor| descriptor.flow_type == flow_type);
783        let first = matches.next()?;
784        if matches.next().is_some() {
785            return None;
786        }
787        Some(first)
788    }
789
790    pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
791        let mut matches = self
792            .flows
793            .iter()
794            .filter(|descriptor| descriptor.id == flow_id);
795        let first = matches.next()?;
796        if matches.next().is_some() {
797            return None;
798        }
799        Some(first)
800    }
801}
802
803pub trait ExecutionObserver: Send + Sync {
804    fn on_node_start(&self, event: &NodeEvent<'_>);
805    fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
806    fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
807}
808
809pub struct NodeEvent<'a> {
810    pub context: &'a FlowContext<'a>,
811    pub node_id: &'a str,
812    pub node: &'a HostNode,
813    pub payload: &'a Value,
814}
815
816#[derive(Clone, Debug, Serialize, Deserialize)]
817pub struct ExecutionState {
818    #[serde(default)]
819    entry: Value,
820    #[serde(default)]
821    input: Value,
822    #[serde(default)]
823    nodes: HashMap<String, NodeOutput>,
824    #[serde(default)]
825    egress: Vec<Value>,
826    #[serde(default, skip_serializing_if = "Option::is_none")]
827    last_output: Option<Value>,
828}
829
830impl ExecutionState {
831    fn new(input: Value) -> Self {
832        Self {
833            entry: input.clone(),
834            input,
835            nodes: HashMap::new(),
836            egress: Vec::new(),
837            last_output: None,
838        }
839    }
840
841    fn ensure_entry(&mut self) {
842        if self.entry.is_null() {
843            self.entry = self.input.clone();
844        }
845    }
846
847    fn context(&self) -> Value {
848        let mut nodes = JsonMap::new();
849        for (id, output) in &self.nodes {
850            nodes.insert(
851                id.clone(),
852                json!({
853                    "ok": output.ok,
854                    "payload": output.payload.clone(),
855                    "meta": output.meta.clone(),
856                }),
857            );
858        }
859        json!({
860            "entry": self.entry.clone(),
861            "input": self.input.clone(),
862            "nodes": nodes,
863        })
864    }
865
866    fn outputs_map(&self) -> JsonMap<String, Value> {
867        let mut outputs = JsonMap::new();
868        for (id, output) in &self.nodes {
869            outputs.insert(id.clone(), output.payload.clone());
870        }
871        outputs
872    }
873    fn push_egress(&mut self, payload: Value) {
874        self.egress.push(payload);
875    }
876
877    fn replace_input(&mut self, input: Value) {
878        self.input = input;
879    }
880
881    fn clear_egress(&mut self) {
882        self.egress.clear();
883    }
884
885    fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
886        if self.egress.is_empty() {
887            return final_payload.unwrap_or(Value::Null);
888        }
889        let mut emitted = std::mem::take(&mut self.egress);
890        if let Some(value) = final_payload {
891            match value {
892                Value::Null => {}
893                Value::Array(items) => emitted.extend(items),
894                other => emitted.push(other),
895            }
896        }
897        Value::Array(emitted)
898    }
899}
900
901#[derive(Clone, Debug, Serialize, Deserialize)]
902struct NodeOutput {
903    ok: bool,
904    payload: Value,
905    meta: Value,
906}
907
908impl NodeOutput {
909    fn new(payload: Value) -> Self {
910        Self {
911            ok: true,
912            payload,
913            meta: Value::Null,
914        }
915    }
916}
917
918struct DispatchOutcome {
919    output: NodeOutput,
920    wait_reason: Option<String>,
921}
922
923impl DispatchOutcome {
924    fn complete(output: NodeOutput) -> Self {
925        Self {
926            output,
927            wait_reason: None,
928        }
929    }
930
931    fn wait(output: NodeOutput, reason: Option<String>) -> Self {
932        Self {
933            output,
934            wait_reason: reason,
935        }
936    }
937}
938
939fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
940    ComponentExecCtx {
941        tenant: ComponentTenantCtx {
942            tenant: ctx.tenant.to_string(),
943            team: None,
944            user: ctx.provider_id.map(str::to_string),
945            trace_id: None,
946            correlation_id: ctx.session_id.map(str::to_string),
947            deadline_unix_ms: None,
948            attempt: 1,
949            idempotency_key: ctx.session_id.map(str::to_string),
950        },
951        flow_id: ctx.flow_id.to_string(),
952        node_id: Some(node_id.to_string()),
953    }
954}
955
956fn extract_wait_reason(payload: &Value) -> Option<String> {
957    match payload {
958        Value::String(s) => Some(s.clone()),
959        Value::Object(map) => map
960            .get("reason")
961            .and_then(Value::as_str)
962            .map(|value| value.to_string()),
963        _ => None,
964    }
965}
966
967fn template_context(state: &ExecutionState, prev: Value) -> Value {
968    let entry = if state.entry.is_null() {
969        Value::Object(JsonMap::new())
970    } else {
971        state.entry.clone()
972    };
973    let mut ctx = JsonMap::new();
974    ctx.insert("entry".into(), entry);
975    ctx.insert("prev".into(), prev);
976    ctx.insert("node".into(), Value::Object(state.outputs_map()));
977    ctx.insert("state".into(), state.context());
978    Value::Object(ctx)
979}
980
981impl From<Flow> for HostFlow {
982    fn from(value: Flow) -> Self {
983        let mut nodes = IndexMap::new();
984        for (id, node) in value.nodes {
985            nodes.insert(id.clone(), HostNode::from(node));
986        }
987        let start = value
988            .entrypoints
989            .get("default")
990            .and_then(Value::as_str)
991            .and_then(|id| NodeId::from_str(id).ok())
992            .or_else(|| nodes.keys().next().cloned());
993        Self {
994            id: value.id.as_str().to_string(),
995            start,
996            nodes,
997        }
998    }
999}
1000
1001impl From<Node> for HostNode {
1002    fn from(node: Node) -> Self {
1003        let component_ref = node.component.id.as_str().to_string();
1004        let raw_operation = node.component.operation.clone();
1005        let operation_in_mapping = extract_operation_from_mapping(&node.input.mapping);
1006        let operation_is_component_exec = raw_operation.as_deref() == Some("component.exec");
1007        let operation_is_emit = raw_operation
1008            .as_deref()
1009            .map(|op| op.starts_with("emit."))
1010            .unwrap_or(false);
1011        let is_component_exec = component_ref == "component.exec" || operation_is_component_exec;
1012
1013        let kind = if is_component_exec {
1014            let target = if component_ref == "component.exec" {
1015                if let Some(op) = raw_operation
1016                    .as_deref()
1017                    .filter(|op| op.starts_with("emit."))
1018                {
1019                    op.to_string()
1020                } else {
1021                    extract_target_component(&node.input.mapping)
1022                        .unwrap_or_else(|| "component.exec".to_string())
1023                }
1024            } else {
1025                extract_target_component(&node.input.mapping)
1026                    .unwrap_or_else(|| component_ref.clone())
1027            };
1028            if target.starts_with("emit.") {
1029                NodeKind::BuiltinEmit {
1030                    kind: emit_kind_from_ref(&target),
1031                }
1032            } else {
1033                NodeKind::Exec {
1034                    target_component: target,
1035                }
1036            }
1037        } else if operation_is_emit {
1038            NodeKind::BuiltinEmit {
1039                kind: emit_kind_from_ref(raw_operation.as_deref().unwrap_or("emit.log")),
1040            }
1041        } else {
1042            match component_ref.as_str() {
1043                "flow.call" => NodeKind::FlowCall,
1044                "provider.invoke" => NodeKind::ProviderInvoke,
1045                "session.wait" => NodeKind::Wait,
1046                comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
1047                    kind: emit_kind_from_ref(comp),
1048                },
1049                other => NodeKind::PackComponent {
1050                    component_ref: other.to_string(),
1051                },
1052            }
1053        };
1054        let component_label = match &kind {
1055            NodeKind::Exec { .. } => "component.exec".to_string(),
1056            NodeKind::PackComponent { component_ref } => component_ref.clone(),
1057            NodeKind::ProviderInvoke => "provider.invoke".to_string(),
1058            NodeKind::FlowCall => "flow.call".to_string(),
1059            NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
1060            NodeKind::Wait => "session.wait".to_string(),
1061        };
1062        let operation_name = if is_component_exec && operation_is_component_exec {
1063            None
1064        } else {
1065            raw_operation.clone()
1066        };
1067        let payload_expr = match kind {
1068            NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.input.mapping),
1069            _ => node.input.mapping.clone(),
1070        };
1071        Self {
1072            kind,
1073            component: component_label,
1074            component_id: if is_component_exec {
1075                "component.exec".to_string()
1076            } else {
1077                component_ref
1078            },
1079            operation_name,
1080            operation_in_mapping,
1081            payload_expr,
1082            routing: node.routing,
1083        }
1084    }
1085}
1086
1087fn extract_target_component(payload: &Value) -> Option<String> {
1088    match payload {
1089        Value::Object(map) => map
1090            .get("component")
1091            .or_else(|| map.get("component_ref"))
1092            .and_then(Value::as_str)
1093            .map(|s| s.to_string()),
1094        _ => None,
1095    }
1096}
1097
1098fn extract_operation_from_mapping(payload: &Value) -> Option<String> {
1099    match payload {
1100        Value::Object(map) => map
1101            .get("operation")
1102            .or_else(|| map.get("op"))
1103            .and_then(Value::as_str)
1104            .map(str::trim)
1105            .filter(|value| !value.is_empty())
1106            .map(|value| value.to_string()),
1107        _ => None,
1108    }
1109}
1110
1111fn extract_emit_payload(payload: &Value) -> Value {
1112    if let Value::Object(map) = payload {
1113        if let Some(input) = map.get("input") {
1114            return input.clone();
1115        }
1116        if let Some(inner) = map.get("payload") {
1117            return inner.clone();
1118        }
1119    }
1120    payload.clone()
1121}
1122
1123fn split_operation_payload(payload: Value) -> (Value, Value) {
1124    if let Value::Object(mut map) = payload.clone()
1125        && map.contains_key("input")
1126    {
1127        let input = map.remove("input").unwrap_or(Value::Null);
1128        let config = map.remove("config").unwrap_or(Value::Null);
1129        let legacy_only = map.keys().all(|key| {
1130            matches!(
1131                key.as_str(),
1132                "operation" | "op" | "component" | "component_ref"
1133            )
1134        });
1135        if legacy_only {
1136            return (input, config);
1137        }
1138    }
1139    (payload, Value::Null)
1140}
1141
1142fn resolve_component_operation(
1143    node_id: &str,
1144    component_label: &str,
1145    payload_operation: Option<String>,
1146    operation_override: Option<&str>,
1147    operation_in_mapping: Option<&str>,
1148) -> Result<String> {
1149    if let Some(op) = operation_override
1150        .map(str::trim)
1151        .filter(|value| !value.is_empty())
1152    {
1153        return Ok(op.to_string());
1154    }
1155
1156    if let Some(op) = payload_operation
1157        .as_deref()
1158        .map(str::trim)
1159        .filter(|value| !value.is_empty())
1160    {
1161        return Ok(op.to_string());
1162    }
1163
1164    let mut message = format!(
1165        "missing operation for node `{}` (component `{}`); expected node.component.operation to be set",
1166        node_id, component_label,
1167    );
1168    if let Some(found) = operation_in_mapping {
1169        message.push_str(&format!(
1170            ". Found operation in input.mapping (`{}`) but this is not used; pack compiler must preserve node.component.operation.",
1171            found
1172        ));
1173    }
1174    bail!(message);
1175}
1176
1177fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
1178    match component_ref {
1179        "emit.log" => EmitKind::Log,
1180        "emit.response" => EmitKind::Response,
1181        other => EmitKind::Other(other.to_string()),
1182    }
1183}
1184
1185fn emit_ref_from_kind(kind: &EmitKind) -> String {
1186    match kind {
1187        EmitKind::Log => "emit.log".to_string(),
1188        EmitKind::Response => "emit.response".to_string(),
1189        EmitKind::Other(other) => other.clone(),
1190    }
1191}
1192
1193#[cfg(test)]
1194mod tests {
1195    use super::*;
1196    use greentic_types::{
1197        Flow, FlowComponentRef, FlowId, FlowKind, InputMapping, Node, NodeId, OutputMapping,
1198        Routing, TelemetryHints,
1199    };
1200    use serde_json::json;
1201    use std::collections::BTreeMap;
1202    use std::str::FromStr;
1203    use std::sync::Mutex;
1204    use tokio::runtime::Runtime;
1205
1206    fn minimal_engine() -> FlowEngine {
1207        FlowEngine {
1208            packs: Vec::new(),
1209            flows: Vec::new(),
1210            flow_sources: HashMap::new(),
1211            flow_cache: RwLock::new(HashMap::new()),
1212            default_env: "local".to_string(),
1213        }
1214    }
1215
1216    #[test]
1217    fn templating_renders_with_partials_and_data() {
1218        let mut state = ExecutionState::new(json!({ "city": "London" }));
1219        state.nodes.insert(
1220            "forecast".to_string(),
1221            NodeOutput::new(json!({ "temp": "20C" })),
1222        );
1223
1224        // templating context includes node outputs for runner-side payload rendering.
1225        let ctx = state.context();
1226        assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
1227    }
1228
1229    #[test]
1230    fn finalize_wraps_emitted_payloads() {
1231        let mut state = ExecutionState::new(json!({}));
1232        state.push_egress(json!({ "text": "first" }));
1233        state.push_egress(json!({ "text": "second" }));
1234        let result = state.finalize_with(Some(json!({ "text": "final" })));
1235        assert_eq!(
1236            result,
1237            json!([
1238                { "text": "first" },
1239                { "text": "second" },
1240                { "text": "final" }
1241            ])
1242        );
1243    }
1244
1245    #[test]
1246    fn finalize_flattens_final_array() {
1247        let mut state = ExecutionState::new(json!({}));
1248        state.push_egress(json!({ "text": "only" }));
1249        let result = state.finalize_with(Some(json!([
1250            { "text": "extra-1" },
1251            { "text": "extra-2" }
1252        ])));
1253        assert_eq!(
1254            result,
1255            json!([
1256                { "text": "only" },
1257                { "text": "extra-1" },
1258                { "text": "extra-2" }
1259            ])
1260        );
1261    }
1262
1263    #[test]
1264    fn missing_operation_reports_node_and_component() {
1265        let engine = minimal_engine();
1266        let rt = Runtime::new().unwrap();
1267        let retry_config = RetryConfig {
1268            max_attempts: 1,
1269            base_delay_ms: 1,
1270        };
1271        let ctx = FlowContext {
1272            tenant: "tenant",
1273            pack_id: "test-pack",
1274            flow_id: "flow",
1275            node_id: Some("missing-op"),
1276            tool: None,
1277            action: None,
1278            session_id: None,
1279            provider_id: None,
1280            retry_config,
1281            observer: None,
1282            mocks: None,
1283        };
1284        let node = HostNode {
1285            kind: NodeKind::Exec {
1286                target_component: "qa.process".into(),
1287            },
1288            component: "component.exec".into(),
1289            component_id: "component.exec".into(),
1290            operation_name: None,
1291            operation_in_mapping: None,
1292            payload_expr: Value::Null,
1293            routing: Routing::End,
1294        };
1295        let _state = ExecutionState::new(Value::Null);
1296        let payload = json!({ "component": "qa.process" });
1297        let err = rt
1298            .block_on(engine.execute_component_exec(
1299                &ctx,
1300                "missing-op",
1301                &node,
1302                payload,
1303                ComponentOverrides {
1304                    component: None,
1305                    operation: None,
1306                },
1307            ))
1308            .unwrap_err();
1309        let message = err.to_string();
1310        assert!(
1311            message.contains("missing operation for node `missing-op`"),
1312            "unexpected message: {message}"
1313        );
1314        assert!(
1315            message.contains("(component `component.exec`)"),
1316            "unexpected message: {message}"
1317        );
1318    }
1319
1320    #[test]
1321    fn missing_operation_mentions_mapping_hint() {
1322        let engine = minimal_engine();
1323        let rt = Runtime::new().unwrap();
1324        let retry_config = RetryConfig {
1325            max_attempts: 1,
1326            base_delay_ms: 1,
1327        };
1328        let ctx = FlowContext {
1329            tenant: "tenant",
1330            pack_id: "test-pack",
1331            flow_id: "flow",
1332            node_id: Some("missing-op-hint"),
1333            tool: None,
1334            action: None,
1335            session_id: None,
1336            provider_id: None,
1337            retry_config,
1338            observer: None,
1339            mocks: None,
1340        };
1341        let node = HostNode {
1342            kind: NodeKind::Exec {
1343                target_component: "qa.process".into(),
1344            },
1345            component: "component.exec".into(),
1346            component_id: "component.exec".into(),
1347            operation_name: None,
1348            operation_in_mapping: Some("render".into()),
1349            payload_expr: Value::Null,
1350            routing: Routing::End,
1351        };
1352        let _state = ExecutionState::new(Value::Null);
1353        let payload = json!({ "component": "qa.process" });
1354        let err = rt
1355            .block_on(engine.execute_component_exec(
1356                &ctx,
1357                "missing-op-hint",
1358                &node,
1359                payload,
1360                ComponentOverrides {
1361                    component: None,
1362                    operation: None,
1363                },
1364            ))
1365            .unwrap_err();
1366        let message = err.to_string();
1367        assert!(
1368            message.contains("missing operation for node `missing-op-hint`"),
1369            "unexpected message: {message}"
1370        );
1371        assert!(
1372            message.contains("Found operation in input.mapping (`render`)"),
1373            "unexpected message: {message}"
1374        );
1375    }
1376
1377    struct CountingObserver {
1378        starts: Mutex<Vec<String>>,
1379        ends: Mutex<Vec<Value>>,
1380    }
1381
1382    impl CountingObserver {
1383        fn new() -> Self {
1384            Self {
1385                starts: Mutex::new(Vec::new()),
1386                ends: Mutex::new(Vec::new()),
1387            }
1388        }
1389    }
1390
1391    impl ExecutionObserver for CountingObserver {
1392        fn on_node_start(&self, event: &NodeEvent<'_>) {
1393            self.starts.lock().unwrap().push(event.node_id.to_string());
1394        }
1395
1396        fn on_node_end(&self, _event: &NodeEvent<'_>, output: &Value) {
1397            self.ends.lock().unwrap().push(output.clone());
1398        }
1399
1400        fn on_node_error(&self, _event: &NodeEvent<'_>, _error: &dyn StdError) {}
1401    }
1402
1403    #[test]
1404    fn emits_end_event_for_successful_node() {
1405        let node_id = NodeId::from_str("emit").unwrap();
1406        let node = Node {
1407            id: node_id.clone(),
1408            component: FlowComponentRef {
1409                id: "emit.log".parse().unwrap(),
1410                pack_alias: None,
1411                operation: None,
1412            },
1413            input: InputMapping {
1414                mapping: json!({ "message": "logged" }),
1415            },
1416            output: OutputMapping {
1417                mapping: Value::Null,
1418            },
1419            routing: Routing::End,
1420            telemetry: TelemetryHints::default(),
1421        };
1422        let mut nodes = indexmap::IndexMap::default();
1423        nodes.insert(node_id.clone(), node);
1424        let flow = Flow {
1425            schema_version: "1.0".into(),
1426            id: FlowId::from_str("emit.flow").unwrap(),
1427            kind: FlowKind::Messaging,
1428            entrypoints: BTreeMap::from([(
1429                "default".to_string(),
1430                Value::String(node_id.to_string()),
1431            )]),
1432            nodes,
1433            metadata: Default::default(),
1434        };
1435        let host_flow = HostFlow::from(flow);
1436
1437        let engine = FlowEngine {
1438            packs: Vec::new(),
1439            flows: Vec::new(),
1440            flow_sources: HashMap::new(),
1441            flow_cache: RwLock::new(HashMap::from([(
1442                FlowKey {
1443                    pack_id: "test-pack".to_string(),
1444                    flow_id: "emit.flow".to_string(),
1445                },
1446                host_flow,
1447            )])),
1448            default_env: "local".to_string(),
1449        };
1450        let observer = CountingObserver::new();
1451        let ctx = FlowContext {
1452            tenant: "demo",
1453            pack_id: "test-pack",
1454            flow_id: "emit.flow",
1455            node_id: None,
1456            tool: None,
1457            action: None,
1458            session_id: None,
1459            provider_id: None,
1460            retry_config: RetryConfig {
1461                max_attempts: 1,
1462                base_delay_ms: 1,
1463            },
1464            observer: Some(&observer),
1465            mocks: None,
1466        };
1467
1468        let rt = Runtime::new().unwrap();
1469        let result = rt.block_on(engine.execute(ctx, Value::Null)).unwrap();
1470        assert!(matches!(result.status, FlowStatus::Completed));
1471
1472        let starts = observer.starts.lock().unwrap();
1473        let ends = observer.ends.lock().unwrap();
1474        assert_eq!(starts.len(), 1);
1475        assert_eq!(ends.len(), 1);
1476        assert_eq!(ends[0], json!({ "message": "logged" }));
1477    }
1478}
1479
1480use tracing::Instrument;
1481
1482pub struct FlowContext<'a> {
1483    pub tenant: &'a str,
1484    pub pack_id: &'a str,
1485    pub flow_id: &'a str,
1486    pub node_id: Option<&'a str>,
1487    pub tool: Option<&'a str>,
1488    pub action: Option<&'a str>,
1489    pub session_id: Option<&'a str>,
1490    pub provider_id: Option<&'a str>,
1491    pub retry_config: RetryConfig,
1492    pub observer: Option<&'a dyn ExecutionObserver>,
1493    pub mocks: Option<&'a MockLayer>,
1494}
1495
1496#[derive(Copy, Clone)]
1497pub struct RetryConfig {
1498    pub max_attempts: u32,
1499    pub base_delay_ms: u64,
1500}
1501
1502fn should_retry(err: &anyhow::Error) -> bool {
1503    let lower = err.to_string().to_lowercase();
1504    lower.contains("transient") || lower.contains("unavailable") || lower.contains("internal")
1505}
1506
1507impl From<FlowRetryConfig> for RetryConfig {
1508    fn from(value: FlowRetryConfig) -> Self {
1509        Self {
1510            max_attempts: value.max_attempts.max(1),
1511            base_delay_ms: value.base_delay_ms.max(50),
1512        }
1513    }
1514}