Skip to main content

greentic_runner_host/runner/
engine.rs

1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
9use anyhow::{Context, Result, anyhow, bail};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use super::templating::{TemplateOptions, render_template_value};
18use crate::config::{FlowRetryConfig, HostConfig};
19use crate::pack::{FlowDescriptor, PackRuntime};
20use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
21use crate::validate::{
22    ValidationConfig, ValidationIssue, ValidationMode, validate_component_envelope,
23    validate_tool_envelope,
24};
25use greentic_types::{Flow, Node, NodeId, Routing};
26
27pub struct FlowEngine {
28    packs: Vec<Arc<PackRuntime>>,
29    flows: Vec<FlowDescriptor>,
30    flow_sources: HashMap<FlowKey, usize>,
31    flow_cache: RwLock<HashMap<FlowKey, HostFlow>>,
32    default_env: String,
33    validation: ValidationConfig,
34}
35
36#[derive(Clone, Debug, PartialEq, Eq, Hash)]
37struct FlowKey {
38    pack_id: String,
39    flow_id: String,
40}
41
42#[derive(Clone, Debug, Serialize, Deserialize)]
43pub struct FlowSnapshot {
44    pub pack_id: String,
45    pub flow_id: String,
46    pub next_node: String,
47    pub state: ExecutionState,
48}
49
50#[derive(Clone, Debug)]
51pub struct FlowWait {
52    pub reason: Option<String>,
53    pub snapshot: FlowSnapshot,
54}
55
56#[derive(Clone, Debug)]
57pub enum FlowStatus {
58    Completed,
59    Waiting(Box<FlowWait>),
60}
61
62#[derive(Clone, Debug)]
63pub struct FlowExecution {
64    pub output: Value,
65    pub status: FlowStatus,
66}
67
68#[derive(Clone, Debug)]
69struct HostFlow {
70    id: String,
71    start: Option<NodeId>,
72    nodes: IndexMap<NodeId, HostNode>,
73}
74
75#[derive(Clone, Debug)]
76pub struct HostNode {
77    kind: NodeKind,
78    /// Backwards-compatible component label for observers/transcript.
79    pub component: String,
80    component_id: String,
81    operation_name: Option<String>,
82    operation_in_mapping: Option<String>,
83    payload_expr: Value,
84    routing: Routing,
85}
86
87impl HostNode {
88    pub fn component_id(&self) -> &str {
89        &self.component_id
90    }
91
92    pub fn operation_name(&self) -> Option<&str> {
93        self.operation_name.as_deref()
94    }
95
96    pub fn operation_in_mapping(&self) -> Option<&str> {
97        self.operation_in_mapping.as_deref()
98    }
99}
100
101#[derive(Clone, Debug)]
102enum NodeKind {
103    Exec { target_component: String },
104    PackComponent { component_ref: String },
105    ProviderInvoke,
106    FlowCall,
107    BuiltinEmit { kind: EmitKind },
108    Wait,
109}
110
111#[derive(Clone, Debug)]
112enum EmitKind {
113    Log,
114    Response,
115    Other(String),
116}
117
118struct ComponentOverrides<'a> {
119    component: Option<&'a str>,
120    operation: Option<&'a str>,
121}
122
123struct ComponentCall {
124    component_ref: String,
125    operation: String,
126    input: Value,
127    config: Value,
128}
129
130impl FlowExecution {
131    fn completed(output: Value) -> Self {
132        Self {
133            output,
134            status: FlowStatus::Completed,
135        }
136    }
137
138    fn waiting(output: Value, wait: FlowWait) -> Self {
139        Self {
140            output,
141            status: FlowStatus::Waiting(Box::new(wait)),
142        }
143    }
144}
145
146impl FlowEngine {
147    pub async fn new(packs: Vec<Arc<PackRuntime>>, config: Arc<HostConfig>) -> Result<Self> {
148        let mut flow_sources: HashMap<FlowKey, usize> = HashMap::new();
149        let mut descriptors = Vec::new();
150        let mut bindings = HashMap::new();
151        for pack in &config.pack_bindings {
152            bindings.insert(pack.pack_id.clone(), pack.flows.clone());
153        }
154        let enforce_bindings = !bindings.is_empty();
155        for (idx, pack) in packs.iter().enumerate() {
156            let pack_id = pack.metadata().pack_id.clone();
157            if enforce_bindings && !bindings.contains_key(&pack_id) {
158                bail!("no gtbind entries found for pack {}", pack_id);
159            }
160            let flows = pack.list_flows().await?;
161            let allowed = bindings.get(&pack_id).map(|flows| {
162                flows
163                    .iter()
164                    .cloned()
165                    .collect::<std::collections::HashSet<_>>()
166            });
167            let mut seen = std::collections::HashSet::new();
168            for flow in flows {
169                if let Some(ref allow) = allowed
170                    && !allow.contains(&flow.id)
171                {
172                    continue;
173                }
174                seen.insert(flow.id.clone());
175                tracing::info!(
176                    flow_id = %flow.id,
177                    flow_type = %flow.flow_type,
178                    pack_id = %flow.pack_id,
179                    pack_index = idx,
180                    "registered flow"
181                );
182                flow_sources.insert(
183                    FlowKey {
184                        pack_id: flow.pack_id.clone(),
185                        flow_id: flow.id.clone(),
186                    },
187                    idx,
188                );
189                descriptors.retain(|existing: &FlowDescriptor| {
190                    !(existing.id == flow.id && existing.pack_id == flow.pack_id)
191                });
192                descriptors.push(flow);
193            }
194            if let Some(allow) = allowed {
195                let missing = allow.difference(&seen).cloned().collect::<Vec<_>>();
196                if !missing.is_empty() {
197                    bail!(
198                        "gtbind flow ids missing in pack {}: {}",
199                        pack_id,
200                        missing.join(", ")
201                    );
202                }
203            }
204        }
205
206        let mut flow_map = HashMap::new();
207        for flow in &descriptors {
208            let pack_id = flow.pack_id.clone();
209            if let Some(&pack_idx) = flow_sources.get(&FlowKey {
210                pack_id: pack_id.clone(),
211                flow_id: flow.id.clone(),
212            }) {
213                let pack_clone = Arc::clone(&packs[pack_idx]);
214                let flow_id = flow.id.clone();
215                let task_flow_id = flow_id.clone();
216                match task::spawn_blocking(move || pack_clone.load_flow(&task_flow_id)).await {
217                    Ok(Ok(loaded_flow)) => {
218                        flow_map.insert(
219                            FlowKey {
220                                pack_id: pack_id.clone(),
221                                flow_id,
222                            },
223                            HostFlow::from(loaded_flow),
224                        );
225                    }
226                    Ok(Err(err)) => {
227                        tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
228                    }
229                    Err(err) => {
230                        tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
231                    }
232                }
233            }
234        }
235
236        Ok(Self {
237            packs,
238            flows: descriptors,
239            flow_sources,
240            flow_cache: RwLock::new(flow_map),
241            default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
242            validation: config.validation.clone(),
243        })
244    }
245
246    async fn get_or_load_flow(&self, pack_id: &str, flow_id: &str) -> Result<HostFlow> {
247        let key = FlowKey {
248            pack_id: pack_id.to_string(),
249            flow_id: flow_id.to_string(),
250        };
251        if let Some(flow) = self.flow_cache.read().get(&key).cloned() {
252            return Ok(flow);
253        }
254
255        let pack_idx = *self
256            .flow_sources
257            .get(&key)
258            .with_context(|| format!("flow {pack_id}:{flow_id} not registered"))?;
259        let pack = Arc::clone(&self.packs[pack_idx]);
260        let flow_id_owned = flow_id.to_string();
261        let task_flow_id = flow_id_owned.clone();
262        let flow = task::spawn_blocking(move || pack.load_flow(&task_flow_id))
263            .await
264            .context("failed to join flow metadata task")??;
265        let host_flow = HostFlow::from(flow);
266        self.flow_cache.write().insert(
267            FlowKey {
268                pack_id: pack_id.to_string(),
269                flow_id: flow_id_owned.clone(),
270            },
271            host_flow.clone(),
272        );
273        Ok(host_flow)
274    }
275
276    pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
277        let span = tracing::info_span!(
278            "flow.execute",
279            tenant = tracing::field::Empty,
280            flow_id = tracing::field::Empty,
281            node_id = tracing::field::Empty,
282            tool = tracing::field::Empty,
283            action = tracing::field::Empty
284        );
285        annotate_span(
286            &span,
287            &FlowSpanAttributes {
288                tenant: ctx.tenant,
289                flow_id: ctx.flow_id,
290                node_id: ctx.node_id,
291                tool: ctx.tool,
292                action: ctx.action,
293            },
294        );
295        set_flow_context(
296            &self.default_env,
297            ctx.tenant,
298            ctx.flow_id,
299            ctx.node_id,
300            ctx.provider_id,
301            ctx.session_id,
302        );
303        let retry_config = ctx.retry_config;
304        let original_input = input;
305        async move {
306            let mut attempt = 0u32;
307            loop {
308                attempt += 1;
309                match self.execute_once(&ctx, original_input.clone()).await {
310                    Ok(value) => return Ok(value),
311                    Err(err) => {
312                        if attempt >= retry_config.max_attempts || !should_retry(&err) {
313                            return Err(err);
314                        }
315                        let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
316                        tracing::warn!(
317                            tenant = ctx.tenant,
318                            flow_id = ctx.flow_id,
319                            attempt,
320                            max_attempts = retry_config.max_attempts,
321                            delay_ms = delay,
322                            error = %err,
323                            "transient flow execution failure, backing off"
324                        );
325                        tokio::time::sleep(Duration::from_millis(delay)).await;
326                    }
327                }
328            }
329        }
330        .instrument(span)
331        .await
332    }
333
334    pub async fn resume(
335        &self,
336        ctx: FlowContext<'_>,
337        snapshot: FlowSnapshot,
338        input: Value,
339    ) -> Result<FlowExecution> {
340        if snapshot.pack_id != ctx.pack_id {
341            bail!(
342                "snapshot pack {} does not match requested {}",
343                snapshot.pack_id,
344                ctx.pack_id
345            );
346        }
347        if snapshot.flow_id != ctx.flow_id {
348            bail!(
349                "snapshot flow {} does not match requested {}",
350                snapshot.flow_id,
351                ctx.flow_id
352            );
353        }
354        let flow_ir = self.get_or_load_flow(ctx.pack_id, ctx.flow_id).await?;
355        let mut state = snapshot.state;
356        state.replace_input(input);
357        state.ensure_entry();
358        self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
359            .await
360    }
361
362    async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
363        let flow_ir = self.get_or_load_flow(ctx.pack_id, ctx.flow_id).await?;
364        let state = ExecutionState::new(input);
365        self.drive_flow(ctx, flow_ir, state, None).await
366    }
367
368    async fn drive_flow(
369        &self,
370        ctx: &FlowContext<'_>,
371        flow_ir: HostFlow,
372        mut state: ExecutionState,
373        resume_from: Option<String>,
374    ) -> Result<FlowExecution> {
375        let mut current = match resume_from {
376            Some(node) => NodeId::from_str(&node)
377                .with_context(|| format!("invalid resume node id `{node}`"))?,
378            None => flow_ir
379                .start
380                .clone()
381                .or_else(|| flow_ir.nodes.keys().next().cloned())
382                .with_context(|| format!("flow {} has no start node", flow_ir.id))?,
383        };
384
385        loop {
386            let node = flow_ir
387                .nodes
388                .get(&current)
389                .with_context(|| format!("node {} not found", current.as_str()))?;
390
391            let payload_template = node.payload_expr.clone();
392            let prev = state
393                .last_output
394                .as_ref()
395                .cloned()
396                .unwrap_or_else(|| Value::Object(JsonMap::new()));
397            let ctx_value = template_context(&state, prev);
398            let payload =
399                render_template_value(&payload_template, &ctx_value, TemplateOptions::default())
400                    .context("failed to render node input template")?;
401            let observed_payload = payload.clone();
402            let node_id = current.clone();
403            let event = NodeEvent {
404                context: ctx,
405                node_id: node_id.as_str(),
406                node,
407                payload: &observed_payload,
408            };
409            if let Some(observer) = ctx.observer {
410                observer.on_node_start(&event);
411            }
412            let dispatch = self
413                .dispatch_node(ctx, node_id.as_str(), node, &mut state, payload, &event)
414                .await;
415            let DispatchOutcome {
416                output,
417                wait_reason,
418            } = match dispatch {
419                Ok(outcome) => outcome,
420                Err(err) => {
421                    if let Some(observer) = ctx.observer {
422                        observer.on_node_error(&event, err.as_ref());
423                    }
424                    return Err(err);
425                }
426            };
427
428            state.nodes.insert(node_id.clone().into(), output.clone());
429            state.last_output = Some(output.payload.clone());
430            if let Some(observer) = ctx.observer {
431                observer.on_node_end(&event, &output.payload);
432            }
433
434            let (next, should_exit) = match &node.routing {
435                Routing::Next { node_id } => (Some(node_id.clone()), false),
436                Routing::End | Routing::Reply => (None, true),
437                Routing::Branch { default, .. } => (default.clone(), default.is_none()),
438                Routing::Custom(raw) => {
439                    tracing::warn!(
440                        flow_id = %flow_ir.id,
441                        node_id = %node_id,
442                        routing = ?raw,
443                        "unsupported routing; terminating flow"
444                    );
445                    (None, true)
446                }
447            };
448
449            if let Some(wait_reason) = wait_reason {
450                let resume_target = next.clone().ok_or_else(|| {
451                    anyhow!(
452                        "session.wait node {} requires a non-empty route",
453                        current.as_str()
454                    )
455                })?;
456                let mut snapshot_state = state.clone();
457                snapshot_state.clear_egress();
458                let snapshot = FlowSnapshot {
459                    pack_id: ctx.pack_id.to_string(),
460                    flow_id: ctx.flow_id.to_string(),
461                    next_node: resume_target.as_str().to_string(),
462                    state: snapshot_state,
463                };
464                let output_value = state.clone().finalize_with(None);
465                return Ok(FlowExecution::waiting(
466                    output_value,
467                    FlowWait {
468                        reason: Some(wait_reason),
469                        snapshot,
470                    },
471                ));
472            }
473
474            if should_exit {
475                return Ok(FlowExecution::completed(
476                    state.finalize_with(Some(output.payload.clone())),
477                ));
478            }
479
480            match next {
481                Some(n) => current = n,
482                None => {
483                    return Ok(FlowExecution::completed(
484                        state.finalize_with(Some(output.payload.clone())),
485                    ));
486                }
487            }
488        }
489    }
490
491    async fn dispatch_node(
492        &self,
493        ctx: &FlowContext<'_>,
494        node_id: &str,
495        node: &HostNode,
496        state: &mut ExecutionState,
497        payload: Value,
498        event: &NodeEvent<'_>,
499    ) -> Result<DispatchOutcome> {
500        match &node.kind {
501            NodeKind::Exec { target_component } => self
502                .execute_component_exec(
503                    ctx,
504                    node_id,
505                    node,
506                    payload,
507                    event,
508                    ComponentOverrides {
509                        component: Some(target_component.as_str()),
510                        operation: node.operation_name.as_deref(),
511                    },
512                )
513                .await
514                .map(DispatchOutcome::complete),
515            NodeKind::PackComponent { component_ref } => self
516                .execute_component_call(ctx, node_id, node, payload, component_ref.as_str(), event)
517                .await
518                .map(DispatchOutcome::complete),
519            NodeKind::FlowCall => self
520                .execute_flow_call(ctx, payload)
521                .await
522                .map(DispatchOutcome::complete),
523            NodeKind::ProviderInvoke => self
524                .execute_provider_invoke(ctx, node_id, state, payload, event)
525                .await
526                .map(DispatchOutcome::complete),
527            NodeKind::BuiltinEmit { kind } => {
528                match kind {
529                    EmitKind::Log | EmitKind::Response => {}
530                    EmitKind::Other(component) => {
531                        tracing::debug!(%component, "handling emit.* as builtin");
532                    }
533                }
534                state.push_egress(payload.clone());
535                Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
536            }
537            NodeKind::Wait => {
538                let reason = extract_wait_reason(&payload);
539                Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
540            }
541        }
542    }
543
544    async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
545        #[derive(Deserialize)]
546        struct FlowCallPayload {
547            #[serde(alias = "flow")]
548            flow_id: String,
549            #[serde(default)]
550            input: Value,
551        }
552
553        let call: FlowCallPayload =
554            serde_json::from_value(payload).context("invalid payload for flow.call node")?;
555        if call.flow_id.trim().is_empty() {
556            bail!("flow.call requires a non-empty flow_id");
557        }
558
559        let sub_input = if call.input.is_null() {
560            Value::Null
561        } else {
562            call.input
563        };
564
565        let flow_id_owned = call.flow_id;
566        let action = "flow.call";
567        let sub_ctx = FlowContext {
568            tenant: ctx.tenant,
569            pack_id: ctx.pack_id,
570            flow_id: flow_id_owned.as_str(),
571            node_id: None,
572            tool: ctx.tool,
573            action: Some(action),
574            session_id: ctx.session_id,
575            provider_id: ctx.provider_id,
576            retry_config: ctx.retry_config,
577            observer: ctx.observer,
578            mocks: ctx.mocks,
579        };
580
581        let execution = Box::pin(self.execute(sub_ctx, sub_input))
582            .await
583            .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
584        match execution.status {
585            FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
586            FlowStatus::Waiting(wait) => bail!(
587                "flow.call cannot pause (flow {} waiting {:?})",
588                flow_id_owned,
589                wait.reason
590            ),
591        }
592    }
593
594    async fn execute_component_exec(
595        &self,
596        ctx: &FlowContext<'_>,
597        node_id: &str,
598        node: &HostNode,
599        payload: Value,
600        event: &NodeEvent<'_>,
601        overrides: ComponentOverrides<'_>,
602    ) -> Result<NodeOutput> {
603        #[derive(Deserialize)]
604        struct ComponentPayload {
605            #[serde(default, alias = "component_ref", alias = "component")]
606            component: Option<String>,
607            #[serde(alias = "op")]
608            operation: Option<String>,
609            #[serde(default)]
610            input: Value,
611            #[serde(default)]
612            config: Value,
613        }
614
615        let payload: ComponentPayload =
616            serde_json::from_value(payload).context("invalid payload for component.exec")?;
617        let component_ref = overrides
618            .component
619            .map(str::to_string)
620            .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
621            .with_context(|| "component.exec requires a component_ref")?;
622        let operation = resolve_component_operation(
623            node_id,
624            node.component_id.as_str(),
625            payload.operation,
626            overrides.operation,
627            node.operation_in_mapping.as_deref(),
628        )?;
629        let call = ComponentCall {
630            component_ref,
631            operation,
632            input: payload.input,
633            config: payload.config,
634        };
635
636        self.invoke_component_call(ctx, node_id, call, event).await
637    }
638
639    async fn execute_component_call(
640        &self,
641        ctx: &FlowContext<'_>,
642        node_id: &str,
643        node: &HostNode,
644        payload: Value,
645        component_ref: &str,
646        event: &NodeEvent<'_>,
647    ) -> Result<NodeOutput> {
648        let payload_operation = extract_operation_from_mapping(&payload);
649        let (input, config) = split_operation_payload(payload);
650        let operation = resolve_component_operation(
651            node_id,
652            node.component_id.as_str(),
653            payload_operation,
654            node.operation_name.as_deref(),
655            node.operation_in_mapping.as_deref(),
656        )?;
657        let call = ComponentCall {
658            component_ref: component_ref.to_string(),
659            operation,
660            input,
661            config,
662        };
663        self.invoke_component_call(ctx, node_id, call, event).await
664    }
665
666    async fn invoke_component_call(
667        &self,
668        ctx: &FlowContext<'_>,
669        node_id: &str,
670        call: ComponentCall,
671        event: &NodeEvent<'_>,
672    ) -> Result<NodeOutput> {
673        self.validate_component(ctx, event, &call)?;
674        let input_json = serde_json::to_string(&call.input)?;
675        let config_json = if call.config.is_null() {
676            None
677        } else {
678            Some(serde_json::to_string(&call.config)?)
679        };
680
681        let key = FlowKey {
682            pack_id: ctx.pack_id.to_string(),
683            flow_id: ctx.flow_id.to_string(),
684        };
685        let pack_idx = *self.flow_sources.get(&key).with_context(|| {
686            format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
687        })?;
688        let pack = Arc::clone(&self.packs[pack_idx]);
689        let exec_ctx = component_exec_ctx(ctx, node_id);
690        let value = pack
691            .invoke_component(
692                call.component_ref.as_str(),
693                exec_ctx,
694                call.operation.as_str(),
695                config_json,
696                input_json,
697            )
698            .await?;
699
700        if let Some((code, message)) = component_error(&value) {
701            bail!(
702                "component {} failed: {}: {}",
703                call.component_ref,
704                code,
705                message
706            );
707        }
708        Ok(NodeOutput::new(value))
709    }
710
711    async fn execute_provider_invoke(
712        &self,
713        ctx: &FlowContext<'_>,
714        node_id: &str,
715        state: &ExecutionState,
716        payload: Value,
717        event: &NodeEvent<'_>,
718    ) -> Result<NodeOutput> {
719        #[derive(Deserialize)]
720        struct ProviderPayload {
721            #[serde(default)]
722            provider_id: Option<String>,
723            #[serde(default)]
724            provider_type: Option<String>,
725            #[serde(default, alias = "operation")]
726            op: Option<String>,
727            #[serde(default)]
728            input: Value,
729            #[serde(default)]
730            in_map: Value,
731            #[serde(default)]
732            out_map: Value,
733            #[serde(default)]
734            err_map: Value,
735        }
736
737        let payload: ProviderPayload =
738            serde_json::from_value(payload).context("invalid payload for provider.invoke")?;
739        let op = payload
740            .op
741            .as_deref()
742            .filter(|v| !v.trim().is_empty())
743            .with_context(|| "provider.invoke requires an op")?
744            .to_string();
745
746        let prev = state
747            .last_output
748            .as_ref()
749            .cloned()
750            .unwrap_or_else(|| Value::Object(JsonMap::new()));
751        let base_ctx = template_context(state, prev);
752
753        let input_value = if !payload.in_map.is_null() {
754            let mut ctx_value = base_ctx.clone();
755            if let Value::Object(ref mut map) = ctx_value {
756                map.insert("input".into(), payload.input.clone());
757                map.insert("result".into(), payload.input.clone());
758            }
759            render_template_value(
760                &payload.in_map,
761                &ctx_value,
762                TemplateOptions {
763                    allow_pointer: true,
764                },
765            )
766            .context("failed to render provider.invoke in_map")?
767        } else if !payload.input.is_null() {
768            payload.input
769        } else {
770            Value::Null
771        };
772        let input_json = serde_json::to_vec(&input_value)?;
773
774        self.validate_tool(
775            ctx,
776            event,
777            payload.provider_id.as_deref(),
778            payload.provider_type.as_deref(),
779            &op,
780            &input_value,
781        )?;
782
783        let key = FlowKey {
784            pack_id: ctx.pack_id.to_string(),
785            flow_id: ctx.flow_id.to_string(),
786        };
787        let pack_idx = *self.flow_sources.get(&key).with_context(|| {
788            format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
789        })?;
790        let pack = Arc::clone(&self.packs[pack_idx]);
791        let binding = pack.resolve_provider(
792            payload.provider_id.as_deref(),
793            payload.provider_type.as_deref(),
794        )?;
795        let exec_ctx = component_exec_ctx(ctx, node_id);
796        let result = pack
797            .invoke_provider(&binding, exec_ctx, &op, input_json)
798            .await?;
799
800        let output = if payload.out_map.is_null() {
801            result
802        } else {
803            let mut ctx_value = base_ctx;
804            if let Value::Object(ref mut map) = ctx_value {
805                map.insert("input".into(), result.clone());
806                map.insert("result".into(), result.clone());
807            }
808            render_template_value(
809                &payload.out_map,
810                &ctx_value,
811                TemplateOptions {
812                    allow_pointer: true,
813                },
814            )
815            .context("failed to render provider.invoke out_map")?
816        };
817        let _ = payload.err_map;
818        Ok(NodeOutput::new(output))
819    }
820
821    fn validate_component(
822        &self,
823        ctx: &FlowContext<'_>,
824        event: &NodeEvent<'_>,
825        call: &ComponentCall,
826    ) -> Result<()> {
827        if self.validation.mode == ValidationMode::Off {
828            return Ok(());
829        }
830        let mut metadata = JsonMap::new();
831        metadata.insert("tenant_id".to_string(), json!(ctx.tenant));
832        if let Some(id) = ctx.session_id {
833            metadata.insert("session".to_string(), json!({ "id": id }));
834        }
835        let envelope = json!({
836            "component_id": call.component_ref,
837            "operation": call.operation,
838            "input": call.input,
839            "config": call.config,
840            "metadata": Value::Object(metadata),
841        });
842        let issues = validate_component_envelope(&envelope);
843        self.report_validation(ctx, event, "component", issues)
844    }
845
846    fn validate_tool(
847        &self,
848        ctx: &FlowContext<'_>,
849        event: &NodeEvent<'_>,
850        provider_id: Option<&str>,
851        provider_type: Option<&str>,
852        operation: &str,
853        input: &Value,
854    ) -> Result<()> {
855        if self.validation.mode == ValidationMode::Off {
856            return Ok(());
857        }
858        let tool_id = provider_id.or(provider_type).unwrap_or("provider.invoke");
859        let mut metadata = JsonMap::new();
860        metadata.insert("tenant_id".to_string(), json!(ctx.tenant));
861        if let Some(id) = ctx.session_id {
862            metadata.insert("session".to_string(), json!({ "id": id }));
863        }
864        let envelope = json!({
865            "tool_id": tool_id,
866            "operation": operation,
867            "input": input,
868            "metadata": Value::Object(metadata),
869        });
870        let issues = validate_tool_envelope(&envelope);
871        self.report_validation(ctx, event, "tool", issues)
872    }
873
874    fn report_validation(
875        &self,
876        ctx: &FlowContext<'_>,
877        event: &NodeEvent<'_>,
878        kind: &str,
879        issues: Vec<ValidationIssue>,
880    ) -> Result<()> {
881        if issues.is_empty() {
882            return Ok(());
883        }
884        if let Some(observer) = ctx.observer {
885            observer.on_validation(event, &issues);
886        }
887        match self.validation.mode {
888            ValidationMode::Warn => {
889                tracing::warn!(
890                    tenant = ctx.tenant,
891                    flow_id = ctx.flow_id,
892                    node_id = event.node_id,
893                    kind,
894                    issues = ?issues,
895                    "invocation envelope validation issues"
896                );
897                Ok(())
898            }
899            ValidationMode::Error => {
900                tracing::error!(
901                    tenant = ctx.tenant,
902                    flow_id = ctx.flow_id,
903                    node_id = event.node_id,
904                    kind,
905                    issues = ?issues,
906                    "invocation envelope validation failed"
907                );
908                bail!("invocation_validation_failed");
909            }
910            ValidationMode::Off => Ok(()),
911        }
912    }
913
914    pub fn flows(&self) -> &[FlowDescriptor] {
915        &self.flows
916    }
917
918    pub fn flow_by_key(&self, pack_id: &str, flow_id: &str) -> Option<&FlowDescriptor> {
919        self.flows
920            .iter()
921            .find(|descriptor| descriptor.pack_id == pack_id && descriptor.id == flow_id)
922    }
923
924    pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
925        let mut matches = self
926            .flows
927            .iter()
928            .filter(|descriptor| descriptor.flow_type == flow_type);
929        let first = matches.next()?;
930        if matches.next().is_some() {
931            return None;
932        }
933        Some(first)
934    }
935
936    pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
937        let mut matches = self
938            .flows
939            .iter()
940            .filter(|descriptor| descriptor.id == flow_id);
941        let first = matches.next()?;
942        if matches.next().is_some() {
943            return None;
944        }
945        Some(first)
946    }
947}
948
949pub trait ExecutionObserver: Send + Sync {
950    fn on_node_start(&self, event: &NodeEvent<'_>);
951    fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
952    fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
953    fn on_validation(&self, _event: &NodeEvent<'_>, _issues: &[ValidationIssue]) {}
954}
955
956pub struct NodeEvent<'a> {
957    pub context: &'a FlowContext<'a>,
958    pub node_id: &'a str,
959    pub node: &'a HostNode,
960    pub payload: &'a Value,
961}
962
963#[derive(Clone, Debug, Serialize, Deserialize)]
964pub struct ExecutionState {
965    #[serde(default)]
966    entry: Value,
967    #[serde(default)]
968    input: Value,
969    #[serde(default)]
970    nodes: HashMap<String, NodeOutput>,
971    #[serde(default)]
972    egress: Vec<Value>,
973    #[serde(default, skip_serializing_if = "Option::is_none")]
974    last_output: Option<Value>,
975}
976
977impl ExecutionState {
978    fn new(input: Value) -> Self {
979        Self {
980            entry: input.clone(),
981            input,
982            nodes: HashMap::new(),
983            egress: Vec::new(),
984            last_output: None,
985        }
986    }
987
988    fn ensure_entry(&mut self) {
989        if self.entry.is_null() {
990            self.entry = self.input.clone();
991        }
992    }
993
994    fn context(&self) -> Value {
995        let mut nodes = JsonMap::new();
996        for (id, output) in &self.nodes {
997            nodes.insert(
998                id.clone(),
999                json!({
1000                    "ok": output.ok,
1001                    "payload": output.payload.clone(),
1002                    "meta": output.meta.clone(),
1003                }),
1004            );
1005        }
1006        json!({
1007            "entry": self.entry.clone(),
1008            "input": self.input.clone(),
1009            "nodes": nodes,
1010        })
1011    }
1012
1013    fn outputs_map(&self) -> JsonMap<String, Value> {
1014        let mut outputs = JsonMap::new();
1015        for (id, output) in &self.nodes {
1016            outputs.insert(id.clone(), output.payload.clone());
1017        }
1018        outputs
1019    }
1020    fn push_egress(&mut self, payload: Value) {
1021        self.egress.push(payload);
1022    }
1023
1024    fn replace_input(&mut self, input: Value) {
1025        self.input = input;
1026    }
1027
1028    fn clear_egress(&mut self) {
1029        self.egress.clear();
1030    }
1031
1032    fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
1033        if self.egress.is_empty() {
1034            return final_payload.unwrap_or(Value::Null);
1035        }
1036        let mut emitted = std::mem::take(&mut self.egress);
1037        if let Some(value) = final_payload {
1038            match value {
1039                Value::Null => {}
1040                Value::Array(items) => emitted.extend(items),
1041                other => emitted.push(other),
1042            }
1043        }
1044        Value::Array(emitted)
1045    }
1046}
1047
1048#[derive(Clone, Debug, Serialize, Deserialize)]
1049struct NodeOutput {
1050    ok: bool,
1051    payload: Value,
1052    meta: Value,
1053}
1054
1055impl NodeOutput {
1056    fn new(payload: Value) -> Self {
1057        Self {
1058            ok: true,
1059            payload,
1060            meta: Value::Null,
1061        }
1062    }
1063}
1064
1065struct DispatchOutcome {
1066    output: NodeOutput,
1067    wait_reason: Option<String>,
1068}
1069
1070impl DispatchOutcome {
1071    fn complete(output: NodeOutput) -> Self {
1072        Self {
1073            output,
1074            wait_reason: None,
1075        }
1076    }
1077
1078    fn wait(output: NodeOutput, reason: Option<String>) -> Self {
1079        Self {
1080            output,
1081            wait_reason: reason,
1082        }
1083    }
1084}
1085
1086fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
1087    ComponentExecCtx {
1088        tenant: ComponentTenantCtx {
1089            tenant: ctx.tenant.to_string(),
1090            team: None,
1091            user: ctx.provider_id.map(str::to_string),
1092            trace_id: None,
1093            correlation_id: ctx.session_id.map(str::to_string),
1094            deadline_unix_ms: None,
1095            attempt: 1,
1096            idempotency_key: ctx.session_id.map(str::to_string),
1097        },
1098        flow_id: ctx.flow_id.to_string(),
1099        node_id: Some(node_id.to_string()),
1100    }
1101}
1102
1103fn component_error(value: &Value) -> Option<(String, String)> {
1104    let obj = value.as_object()?;
1105    let ok = obj.get("ok").and_then(Value::as_bool)?;
1106    if ok {
1107        return None;
1108    }
1109    let err = obj.get("error")?.as_object()?;
1110    let code = err
1111        .get("code")
1112        .and_then(Value::as_str)
1113        .unwrap_or("component_error");
1114    let message = err
1115        .get("message")
1116        .and_then(Value::as_str)
1117        .unwrap_or("component reported error");
1118    Some((code.to_string(), message.to_string()))
1119}
1120
1121fn extract_wait_reason(payload: &Value) -> Option<String> {
1122    match payload {
1123        Value::String(s) => Some(s.clone()),
1124        Value::Object(map) => map
1125            .get("reason")
1126            .and_then(Value::as_str)
1127            .map(|value| value.to_string()),
1128        _ => None,
1129    }
1130}
1131
1132fn template_context(state: &ExecutionState, prev: Value) -> Value {
1133    let entry = if state.entry.is_null() {
1134        Value::Object(JsonMap::new())
1135    } else {
1136        state.entry.clone()
1137    };
1138    let mut ctx = JsonMap::new();
1139    ctx.insert("entry".into(), entry);
1140    ctx.insert("prev".into(), prev);
1141    ctx.insert("node".into(), Value::Object(state.outputs_map()));
1142    ctx.insert("state".into(), state.context());
1143    Value::Object(ctx)
1144}
1145
1146impl From<Flow> for HostFlow {
1147    fn from(value: Flow) -> Self {
1148        let mut nodes = IndexMap::new();
1149        for (id, node) in value.nodes {
1150            nodes.insert(id.clone(), HostNode::from(node));
1151        }
1152        let start = value
1153            .entrypoints
1154            .get("default")
1155            .and_then(Value::as_str)
1156            .and_then(|id| NodeId::from_str(id).ok())
1157            .or_else(|| nodes.keys().next().cloned());
1158        Self {
1159            id: value.id.as_str().to_string(),
1160            start,
1161            nodes,
1162        }
1163    }
1164}
1165
1166impl From<Node> for HostNode {
1167    fn from(node: Node) -> Self {
1168        let component_ref = node.component.id.as_str().to_string();
1169        let raw_operation = node.component.operation.clone();
1170        let operation_in_mapping = extract_operation_from_mapping(&node.input.mapping);
1171        let operation_is_component_exec = raw_operation.as_deref() == Some("component.exec");
1172        let operation_is_emit = raw_operation
1173            .as_deref()
1174            .map(|op| op.starts_with("emit."))
1175            .unwrap_or(false);
1176        let is_component_exec = component_ref == "component.exec" || operation_is_component_exec;
1177
1178        let kind = if is_component_exec {
1179            let target = if component_ref == "component.exec" {
1180                if let Some(op) = raw_operation
1181                    .as_deref()
1182                    .filter(|op| op.starts_with("emit."))
1183                {
1184                    op.to_string()
1185                } else {
1186                    extract_target_component(&node.input.mapping)
1187                        .unwrap_or_else(|| "component.exec".to_string())
1188                }
1189            } else {
1190                extract_target_component(&node.input.mapping)
1191                    .unwrap_or_else(|| component_ref.clone())
1192            };
1193            if target.starts_with("emit.") {
1194                NodeKind::BuiltinEmit {
1195                    kind: emit_kind_from_ref(&target),
1196                }
1197            } else {
1198                NodeKind::Exec {
1199                    target_component: target,
1200                }
1201            }
1202        } else if operation_is_emit {
1203            NodeKind::BuiltinEmit {
1204                kind: emit_kind_from_ref(raw_operation.as_deref().unwrap_or("emit.log")),
1205            }
1206        } else {
1207            match component_ref.as_str() {
1208                "flow.call" => NodeKind::FlowCall,
1209                "provider.invoke" => NodeKind::ProviderInvoke,
1210                "session.wait" => NodeKind::Wait,
1211                comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
1212                    kind: emit_kind_from_ref(comp),
1213                },
1214                other => NodeKind::PackComponent {
1215                    component_ref: other.to_string(),
1216                },
1217            }
1218        };
1219        let component_label = match &kind {
1220            NodeKind::Exec { .. } => "component.exec".to_string(),
1221            NodeKind::PackComponent { component_ref } => component_ref.clone(),
1222            NodeKind::ProviderInvoke => "provider.invoke".to_string(),
1223            NodeKind::FlowCall => "flow.call".to_string(),
1224            NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
1225            NodeKind::Wait => "session.wait".to_string(),
1226        };
1227        let operation_name = if is_component_exec && operation_is_component_exec {
1228            None
1229        } else {
1230            raw_operation.clone()
1231        };
1232        let payload_expr = match kind {
1233            NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.input.mapping),
1234            _ => node.input.mapping.clone(),
1235        };
1236        Self {
1237            kind,
1238            component: component_label,
1239            component_id: if is_component_exec {
1240                "component.exec".to_string()
1241            } else {
1242                component_ref
1243            },
1244            operation_name,
1245            operation_in_mapping,
1246            payload_expr,
1247            routing: node.routing,
1248        }
1249    }
1250}
1251
1252fn extract_target_component(payload: &Value) -> Option<String> {
1253    match payload {
1254        Value::Object(map) => map
1255            .get("component")
1256            .or_else(|| map.get("component_ref"))
1257            .and_then(Value::as_str)
1258            .map(|s| s.to_string()),
1259        _ => None,
1260    }
1261}
1262
1263fn extract_operation_from_mapping(payload: &Value) -> Option<String> {
1264    match payload {
1265        Value::Object(map) => map
1266            .get("operation")
1267            .or_else(|| map.get("op"))
1268            .and_then(Value::as_str)
1269            .map(str::trim)
1270            .filter(|value| !value.is_empty())
1271            .map(|value| value.to_string()),
1272        _ => None,
1273    }
1274}
1275
1276fn extract_emit_payload(payload: &Value) -> Value {
1277    if let Value::Object(map) = payload {
1278        if let Some(input) = map.get("input") {
1279            return input.clone();
1280        }
1281        if let Some(inner) = map.get("payload") {
1282            return inner.clone();
1283        }
1284    }
1285    payload.clone()
1286}
1287
1288fn split_operation_payload(payload: Value) -> (Value, Value) {
1289    if let Value::Object(mut map) = payload.clone()
1290        && map.contains_key("input")
1291    {
1292        let input = map.remove("input").unwrap_or(Value::Null);
1293        let config = map.remove("config").unwrap_or(Value::Null);
1294        let legacy_only = map.keys().all(|key| {
1295            matches!(
1296                key.as_str(),
1297                "operation" | "op" | "component" | "component_ref"
1298            )
1299        });
1300        if legacy_only {
1301            return (input, config);
1302        }
1303    }
1304    (payload, Value::Null)
1305}
1306
1307fn resolve_component_operation(
1308    node_id: &str,
1309    component_label: &str,
1310    payload_operation: Option<String>,
1311    operation_override: Option<&str>,
1312    operation_in_mapping: Option<&str>,
1313) -> Result<String> {
1314    if let Some(op) = operation_override
1315        .map(str::trim)
1316        .filter(|value| !value.is_empty())
1317    {
1318        return Ok(op.to_string());
1319    }
1320
1321    if let Some(op) = payload_operation
1322        .as_deref()
1323        .map(str::trim)
1324        .filter(|value| !value.is_empty())
1325    {
1326        return Ok(op.to_string());
1327    }
1328
1329    let mut message = format!(
1330        "missing operation for node `{}` (component `{}`); expected node.component.operation to be set",
1331        node_id, component_label,
1332    );
1333    if let Some(found) = operation_in_mapping {
1334        message.push_str(&format!(
1335            ". Found operation in input.mapping (`{}`) but this is not used; pack compiler must preserve node.component.operation.",
1336            found
1337        ));
1338    }
1339    bail!(message);
1340}
1341
1342fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
1343    match component_ref {
1344        "emit.log" => EmitKind::Log,
1345        "emit.response" => EmitKind::Response,
1346        other => EmitKind::Other(other.to_string()),
1347    }
1348}
1349
1350fn emit_ref_from_kind(kind: &EmitKind) -> String {
1351    match kind {
1352        EmitKind::Log => "emit.log".to_string(),
1353        EmitKind::Response => "emit.response".to_string(),
1354        EmitKind::Other(other) => other.clone(),
1355    }
1356}
1357
1358#[cfg(test)]
1359mod tests {
1360    use super::*;
1361    use crate::validate::{ValidationConfig, ValidationMode};
1362    use greentic_types::{
1363        Flow, FlowComponentRef, FlowId, FlowKind, InputMapping, Node, NodeId, OutputMapping,
1364        Routing, TelemetryHints,
1365    };
1366    use serde_json::json;
1367    use std::collections::BTreeMap;
1368    use std::str::FromStr;
1369    use std::sync::Mutex;
1370    use tokio::runtime::Runtime;
1371
1372    fn minimal_engine() -> FlowEngine {
1373        FlowEngine {
1374            packs: Vec::new(),
1375            flows: Vec::new(),
1376            flow_sources: HashMap::new(),
1377            flow_cache: RwLock::new(HashMap::new()),
1378            default_env: "local".to_string(),
1379            validation: ValidationConfig {
1380                mode: ValidationMode::Off,
1381            },
1382        }
1383    }
1384
1385    #[test]
1386    fn templating_renders_with_partials_and_data() {
1387        let mut state = ExecutionState::new(json!({ "city": "London" }));
1388        state.nodes.insert(
1389            "forecast".to_string(),
1390            NodeOutput::new(json!({ "temp": "20C" })),
1391        );
1392
1393        // templating context includes node outputs for runner-side payload rendering.
1394        let ctx = state.context();
1395        assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
1396    }
1397
1398    #[test]
1399    fn finalize_wraps_emitted_payloads() {
1400        let mut state = ExecutionState::new(json!({}));
1401        state.push_egress(json!({ "text": "first" }));
1402        state.push_egress(json!({ "text": "second" }));
1403        let result = state.finalize_with(Some(json!({ "text": "final" })));
1404        assert_eq!(
1405            result,
1406            json!([
1407                { "text": "first" },
1408                { "text": "second" },
1409                { "text": "final" }
1410            ])
1411        );
1412    }
1413
1414    #[test]
1415    fn finalize_flattens_final_array() {
1416        let mut state = ExecutionState::new(json!({}));
1417        state.push_egress(json!({ "text": "only" }));
1418        let result = state.finalize_with(Some(json!([
1419            { "text": "extra-1" },
1420            { "text": "extra-2" }
1421        ])));
1422        assert_eq!(
1423            result,
1424            json!([
1425                { "text": "only" },
1426                { "text": "extra-1" },
1427                { "text": "extra-2" }
1428            ])
1429        );
1430    }
1431
1432    #[test]
1433    fn missing_operation_reports_node_and_component() {
1434        let engine = minimal_engine();
1435        let rt = Runtime::new().unwrap();
1436        let retry_config = RetryConfig {
1437            max_attempts: 1,
1438            base_delay_ms: 1,
1439        };
1440        let ctx = FlowContext {
1441            tenant: "tenant",
1442            pack_id: "test-pack",
1443            flow_id: "flow",
1444            node_id: Some("missing-op"),
1445            tool: None,
1446            action: None,
1447            session_id: None,
1448            provider_id: None,
1449            retry_config,
1450            observer: None,
1451            mocks: None,
1452        };
1453        let node = HostNode {
1454            kind: NodeKind::Exec {
1455                target_component: "qa.process".into(),
1456            },
1457            component: "component.exec".into(),
1458            component_id: "component.exec".into(),
1459            operation_name: None,
1460            operation_in_mapping: None,
1461            payload_expr: Value::Null,
1462            routing: Routing::End,
1463        };
1464        let _state = ExecutionState::new(Value::Null);
1465        let payload = json!({ "component": "qa.process" });
1466        let event = NodeEvent {
1467            context: &ctx,
1468            node_id: "missing-op",
1469            node: &node,
1470            payload: &payload,
1471        };
1472        let err = rt
1473            .block_on(engine.execute_component_exec(
1474                &ctx,
1475                "missing-op",
1476                &node,
1477                payload.clone(),
1478                &event,
1479                ComponentOverrides {
1480                    component: None,
1481                    operation: None,
1482                },
1483            ))
1484            .unwrap_err();
1485        let message = err.to_string();
1486        assert!(
1487            message.contains("missing operation for node `missing-op`"),
1488            "unexpected message: {message}"
1489        );
1490        assert!(
1491            message.contains("(component `component.exec`)"),
1492            "unexpected message: {message}"
1493        );
1494    }
1495
1496    #[test]
1497    fn missing_operation_mentions_mapping_hint() {
1498        let engine = minimal_engine();
1499        let rt = Runtime::new().unwrap();
1500        let retry_config = RetryConfig {
1501            max_attempts: 1,
1502            base_delay_ms: 1,
1503        };
1504        let ctx = FlowContext {
1505            tenant: "tenant",
1506            pack_id: "test-pack",
1507            flow_id: "flow",
1508            node_id: Some("missing-op-hint"),
1509            tool: None,
1510            action: None,
1511            session_id: None,
1512            provider_id: None,
1513            retry_config,
1514            observer: None,
1515            mocks: None,
1516        };
1517        let node = HostNode {
1518            kind: NodeKind::Exec {
1519                target_component: "qa.process".into(),
1520            },
1521            component: "component.exec".into(),
1522            component_id: "component.exec".into(),
1523            operation_name: None,
1524            operation_in_mapping: Some("render".into()),
1525            payload_expr: Value::Null,
1526            routing: Routing::End,
1527        };
1528        let _state = ExecutionState::new(Value::Null);
1529        let payload = json!({ "component": "qa.process" });
1530        let event = NodeEvent {
1531            context: &ctx,
1532            node_id: "missing-op-hint",
1533            node: &node,
1534            payload: &payload,
1535        };
1536        let err = rt
1537            .block_on(engine.execute_component_exec(
1538                &ctx,
1539                "missing-op-hint",
1540                &node,
1541                payload.clone(),
1542                &event,
1543                ComponentOverrides {
1544                    component: None,
1545                    operation: None,
1546                },
1547            ))
1548            .unwrap_err();
1549        let message = err.to_string();
1550        assert!(
1551            message.contains("missing operation for node `missing-op-hint`"),
1552            "unexpected message: {message}"
1553        );
1554        assert!(
1555            message.contains("Found operation in input.mapping (`render`)"),
1556            "unexpected message: {message}"
1557        );
1558    }
1559
1560    struct CountingObserver {
1561        starts: Mutex<Vec<String>>,
1562        ends: Mutex<Vec<Value>>,
1563    }
1564
1565    impl CountingObserver {
1566        fn new() -> Self {
1567            Self {
1568                starts: Mutex::new(Vec::new()),
1569                ends: Mutex::new(Vec::new()),
1570            }
1571        }
1572    }
1573
1574    impl ExecutionObserver for CountingObserver {
1575        fn on_node_start(&self, event: &NodeEvent<'_>) {
1576            self.starts.lock().unwrap().push(event.node_id.to_string());
1577        }
1578
1579        fn on_node_end(&self, _event: &NodeEvent<'_>, output: &Value) {
1580            self.ends.lock().unwrap().push(output.clone());
1581        }
1582
1583        fn on_node_error(&self, _event: &NodeEvent<'_>, _error: &dyn StdError) {}
1584    }
1585
1586    #[test]
1587    fn emits_end_event_for_successful_node() {
1588        let node_id = NodeId::from_str("emit").unwrap();
1589        let node = Node {
1590            id: node_id.clone(),
1591            component: FlowComponentRef {
1592                id: "emit.log".parse().unwrap(),
1593                pack_alias: None,
1594                operation: None,
1595            },
1596            input: InputMapping {
1597                mapping: json!({ "message": "logged" }),
1598            },
1599            output: OutputMapping {
1600                mapping: Value::Null,
1601            },
1602            routing: Routing::End,
1603            telemetry: TelemetryHints::default(),
1604        };
1605        let mut nodes = indexmap::IndexMap::default();
1606        nodes.insert(node_id.clone(), node);
1607        let flow = Flow {
1608            schema_version: "1.0".into(),
1609            id: FlowId::from_str("emit.flow").unwrap(),
1610            kind: FlowKind::Messaging,
1611            entrypoints: BTreeMap::from([(
1612                "default".to_string(),
1613                Value::String(node_id.to_string()),
1614            )]),
1615            nodes,
1616            metadata: Default::default(),
1617        };
1618        let host_flow = HostFlow::from(flow);
1619
1620        let engine = FlowEngine {
1621            packs: Vec::new(),
1622            flows: Vec::new(),
1623            flow_sources: HashMap::new(),
1624            flow_cache: RwLock::new(HashMap::from([(
1625                FlowKey {
1626                    pack_id: "test-pack".to_string(),
1627                    flow_id: "emit.flow".to_string(),
1628                },
1629                host_flow,
1630            )])),
1631            default_env: "local".to_string(),
1632            validation: ValidationConfig {
1633                mode: ValidationMode::Off,
1634            },
1635        };
1636        let observer = CountingObserver::new();
1637        let ctx = FlowContext {
1638            tenant: "demo",
1639            pack_id: "test-pack",
1640            flow_id: "emit.flow",
1641            node_id: None,
1642            tool: None,
1643            action: None,
1644            session_id: None,
1645            provider_id: None,
1646            retry_config: RetryConfig {
1647                max_attempts: 1,
1648                base_delay_ms: 1,
1649            },
1650            observer: Some(&observer),
1651            mocks: None,
1652        };
1653
1654        let rt = Runtime::new().unwrap();
1655        let result = rt.block_on(engine.execute(ctx, Value::Null)).unwrap();
1656        assert!(matches!(result.status, FlowStatus::Completed));
1657
1658        let starts = observer.starts.lock().unwrap();
1659        let ends = observer.ends.lock().unwrap();
1660        assert_eq!(starts.len(), 1);
1661        assert_eq!(ends.len(), 1);
1662        assert_eq!(ends[0], json!({ "message": "logged" }));
1663    }
1664}
1665
1666use tracing::Instrument;
1667
1668pub struct FlowContext<'a> {
1669    pub tenant: &'a str,
1670    pub pack_id: &'a str,
1671    pub flow_id: &'a str,
1672    pub node_id: Option<&'a str>,
1673    pub tool: Option<&'a str>,
1674    pub action: Option<&'a str>,
1675    pub session_id: Option<&'a str>,
1676    pub provider_id: Option<&'a str>,
1677    pub retry_config: RetryConfig,
1678    pub observer: Option<&'a dyn ExecutionObserver>,
1679    pub mocks: Option<&'a MockLayer>,
1680}
1681
1682#[derive(Copy, Clone)]
1683pub struct RetryConfig {
1684    pub max_attempts: u32,
1685    pub base_delay_ms: u64,
1686}
1687
1688fn should_retry(err: &anyhow::Error) -> bool {
1689    let lower = err.to_string().to_lowercase();
1690    lower.contains("transient") || lower.contains("unavailable") || lower.contains("internal")
1691}
1692
1693impl From<FlowRetryConfig> for RetryConfig {
1694    fn from(value: FlowRetryConfig) -> Self {
1695        Self {
1696            max_attempts: value.max_attempts.max(1),
1697            base_delay_ms: value.base_delay_ms.max(50),
1698        }
1699    }
1700}