greentic_runner_host/runner/
engine.rs

1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
9use anyhow::{Context, Result, anyhow, bail};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use crate::config::{FlowRetryConfig, HostConfig};
18use crate::pack::{FlowDescriptor, PackRuntime};
19use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
20use greentic_types::{Flow, Node, NodeId, Routing};
21
22pub struct FlowEngine {
23    packs: Vec<Arc<PackRuntime>>,
24    flows: Vec<FlowDescriptor>,
25    flow_sources: HashMap<String, usize>,
26    flow_cache: RwLock<HashMap<String, HostFlow>>,
27    default_env: String,
28}
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
31pub struct FlowSnapshot {
32    pub flow_id: String,
33    pub next_node: String,
34    pub state: ExecutionState,
35}
36
37#[derive(Clone, Debug)]
38pub struct FlowWait {
39    pub reason: Option<String>,
40    pub snapshot: FlowSnapshot,
41}
42
43#[derive(Clone, Debug)]
44pub enum FlowStatus {
45    Completed,
46    Waiting(FlowWait),
47}
48
49#[derive(Clone, Debug)]
50pub struct FlowExecution {
51    pub output: Value,
52    pub status: FlowStatus,
53}
54
55#[derive(Clone, Debug)]
56struct HostFlow {
57    id: String,
58    start: Option<NodeId>,
59    nodes: IndexMap<NodeId, HostNode>,
60}
61
62#[derive(Clone, Debug)]
63pub struct HostNode {
64    kind: NodeKind,
65    /// Backwards-compatible component label for observers/transcript.
66    pub component: String,
67    payload_expr: Value,
68    routing: Routing,
69}
70
71#[derive(Clone, Debug)]
72enum NodeKind {
73    Exec { target_component: String },
74    PackComponent { component_ref: String },
75    ProviderInvoke,
76    FlowCall,
77    BuiltinEmit { kind: EmitKind },
78    Wait,
79}
80
81#[derive(Clone, Debug)]
82enum EmitKind {
83    Log,
84    Response,
85    Other(String),
86}
87
88impl FlowExecution {
89    fn completed(output: Value) -> Self {
90        Self {
91            output,
92            status: FlowStatus::Completed,
93        }
94    }
95
96    fn waiting(output: Value, wait: FlowWait) -> Self {
97        Self {
98            output,
99            status: FlowStatus::Waiting(wait),
100        }
101    }
102}
103
104impl FlowEngine {
105    pub async fn new(packs: Vec<Arc<PackRuntime>>, _config: Arc<HostConfig>) -> Result<Self> {
106        let mut flow_sources = HashMap::new();
107        let mut descriptors = Vec::new();
108        for (idx, pack) in packs.iter().enumerate() {
109            let flows = pack.list_flows().await?;
110            for flow in flows {
111                tracing::info!(
112                    flow_id = %flow.id,
113                    flow_type = %flow.flow_type,
114                    pack_index = idx,
115                    "registered flow"
116                );
117                flow_sources.insert(flow.id.clone(), idx);
118                descriptors.retain(|existing: &FlowDescriptor| existing.id != flow.id);
119                descriptors.push(flow);
120            }
121        }
122
123        let mut flow_map = HashMap::new();
124        for flow in &descriptors {
125            if let Some(&pack_idx) = flow_sources.get(&flow.id) {
126                let pack_clone = Arc::clone(&packs[pack_idx]);
127                let flow_id = flow.id.clone();
128                let task_flow_id = flow_id.clone();
129                match task::spawn_blocking(move || pack_clone.load_flow(&task_flow_id)).await {
130                    Ok(Ok(flow)) => {
131                        flow_map.insert(flow_id, HostFlow::from(flow));
132                    }
133                    Ok(Err(err)) => {
134                        tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
135                    }
136                    Err(err) => {
137                        tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
138                    }
139                }
140            }
141        }
142
143        Ok(Self {
144            packs,
145            flows: descriptors,
146            flow_sources,
147            flow_cache: RwLock::new(flow_map),
148            default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
149        })
150    }
151
152    async fn get_or_load_flow(&self, flow_id: &str) -> Result<HostFlow> {
153        if let Some(flow) = self.flow_cache.read().get(flow_id).cloned() {
154            return Ok(flow);
155        }
156
157        let pack_idx = *self
158            .flow_sources
159            .get(flow_id)
160            .with_context(|| format!("flow {flow_id} not registered"))?;
161        let pack = Arc::clone(&self.packs[pack_idx]);
162        let flow_id_owned = flow_id.to_string();
163        let task_flow_id = flow_id_owned.clone();
164        let flow = task::spawn_blocking(move || pack.load_flow(&task_flow_id))
165            .await
166            .context("failed to join flow metadata task")??;
167        let host_flow = HostFlow::from(flow);
168        self.flow_cache
169            .write()
170            .insert(flow_id_owned.clone(), host_flow.clone());
171        Ok(host_flow)
172    }
173
174    pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
175        let span = tracing::info_span!(
176            "flow.execute",
177            tenant = tracing::field::Empty,
178            flow_id = tracing::field::Empty,
179            node_id = tracing::field::Empty,
180            tool = tracing::field::Empty,
181            action = tracing::field::Empty
182        );
183        annotate_span(
184            &span,
185            &FlowSpanAttributes {
186                tenant: ctx.tenant,
187                flow_id: ctx.flow_id,
188                node_id: ctx.node_id,
189                tool: ctx.tool,
190                action: ctx.action,
191            },
192        );
193        set_flow_context(
194            &self.default_env,
195            ctx.tenant,
196            ctx.flow_id,
197            ctx.node_id,
198            ctx.provider_id,
199            ctx.session_id,
200        );
201        let retry_config = ctx.retry_config;
202        let original_input = input;
203        async move {
204            let mut attempt = 0u32;
205            loop {
206                attempt += 1;
207                match self.execute_once(&ctx, original_input.clone()).await {
208                    Ok(value) => return Ok(value),
209                    Err(err) => {
210                        if attempt >= retry_config.max_attempts || !should_retry(&err) {
211                            return Err(err);
212                        }
213                        let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
214                        tracing::warn!(
215                            tenant = ctx.tenant,
216                            flow_id = ctx.flow_id,
217                            attempt,
218                            max_attempts = retry_config.max_attempts,
219                            delay_ms = delay,
220                            error = %err,
221                            "transient flow execution failure, backing off"
222                        );
223                        tokio::time::sleep(Duration::from_millis(delay)).await;
224                    }
225                }
226            }
227        }
228        .instrument(span)
229        .await
230    }
231
232    pub async fn resume(
233        &self,
234        ctx: FlowContext<'_>,
235        snapshot: FlowSnapshot,
236        input: Value,
237    ) -> Result<FlowExecution> {
238        if snapshot.flow_id != ctx.flow_id {
239            bail!(
240                "snapshot flow {} does not match requested {}",
241                snapshot.flow_id,
242                ctx.flow_id
243            );
244        }
245        let flow_ir = self.get_or_load_flow(ctx.flow_id).await?;
246        let mut state = snapshot.state;
247        state.replace_input(input);
248        self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
249            .await
250    }
251
252    async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
253        let flow_ir = self.get_or_load_flow(ctx.flow_id).await?;
254        let state = ExecutionState::new(input);
255        self.drive_flow(ctx, flow_ir, state, None).await
256    }
257
258    async fn drive_flow(
259        &self,
260        ctx: &FlowContext<'_>,
261        flow_ir: HostFlow,
262        mut state: ExecutionState,
263        resume_from: Option<String>,
264    ) -> Result<FlowExecution> {
265        let mut current = match resume_from {
266            Some(node) => NodeId::from_str(&node)
267                .with_context(|| format!("invalid resume node id `{node}`"))?,
268            None => flow_ir
269                .start
270                .clone()
271                .or_else(|| flow_ir.nodes.keys().next().cloned())
272                .with_context(|| format!("flow {} has no start node", flow_ir.id))?,
273        };
274
275        loop {
276            let node = flow_ir
277                .nodes
278                .get(&current)
279                .with_context(|| format!("node {} not found", current.as_str()))?;
280
281            let payload = node.payload_expr.clone();
282            let observed_payload = payload.clone();
283            let node_id = current.clone();
284            let event = NodeEvent {
285                context: ctx,
286                node_id: node_id.as_str(),
287                node,
288                payload: &observed_payload,
289            };
290            if let Some(observer) = ctx.observer {
291                observer.on_node_start(&event);
292            }
293            let DispatchOutcome {
294                output,
295                wait_reason,
296            } = self
297                .dispatch_node(ctx, node_id.as_str(), node, &mut state, payload)
298                .await?;
299
300            state.nodes.insert(node_id.clone().into(), output.clone());
301
302            let (next, should_exit) = match &node.routing {
303                Routing::Next { node_id } => (Some(node_id.clone()), false),
304                Routing::End | Routing::Reply => (None, true),
305                Routing::Branch { default, .. } => (default.clone(), default.is_none()),
306                Routing::Custom(raw) => {
307                    tracing::warn!(
308                        flow_id = %flow_ir.id,
309                        node_id = %node_id,
310                        routing = ?raw,
311                        "unsupported routing; terminating flow"
312                    );
313                    (None, true)
314                }
315            };
316
317            if let Some(wait_reason) = wait_reason {
318                let resume_target = next.clone().ok_or_else(|| {
319                    anyhow!(
320                        "session.wait node {} requires a non-empty route",
321                        current.as_str()
322                    )
323                })?;
324                let mut snapshot_state = state.clone();
325                snapshot_state.clear_egress();
326                let snapshot = FlowSnapshot {
327                    flow_id: ctx.flow_id.to_string(),
328                    next_node: resume_target.as_str().to_string(),
329                    state: snapshot_state,
330                };
331                let output_value = state.clone().finalize_with(None);
332                return Ok(FlowExecution::waiting(
333                    output_value,
334                    FlowWait {
335                        reason: Some(wait_reason),
336                        snapshot,
337                    },
338                ));
339            }
340
341            if should_exit {
342                return Ok(FlowExecution::completed(
343                    state.finalize_with(Some(output.payload.clone())),
344                ));
345            }
346
347            match next {
348                Some(n) => current = n,
349                None => {
350                    return Ok(FlowExecution::completed(
351                        state.finalize_with(Some(output.payload.clone())),
352                    ));
353                }
354            }
355        }
356    }
357
358    async fn dispatch_node(
359        &self,
360        ctx: &FlowContext<'_>,
361        node_id: &str,
362        node: &HostNode,
363        state: &mut ExecutionState,
364        payload: Value,
365    ) -> Result<DispatchOutcome> {
366        match &node.kind {
367            NodeKind::Exec { target_component } => self
368                .execute_component_exec(
369                    ctx,
370                    node_id,
371                    state,
372                    payload,
373                    Some(target_component.as_str()),
374                )
375                .await
376                .map(DispatchOutcome::complete),
377            NodeKind::PackComponent { component_ref } => self
378                .execute_component_exec(ctx, node_id, state, payload, Some(component_ref.as_str()))
379                .await
380                .map(DispatchOutcome::complete),
381            NodeKind::FlowCall => self
382                .execute_flow_call(ctx, payload)
383                .await
384                .map(DispatchOutcome::complete),
385            NodeKind::ProviderInvoke => self
386                .execute_provider_invoke(ctx, node_id, state, payload)
387                .await
388                .map(DispatchOutcome::complete),
389            NodeKind::BuiltinEmit { kind } => {
390                match kind {
391                    EmitKind::Log | EmitKind::Response => {}
392                    EmitKind::Other(component) => {
393                        tracing::debug!(%component, "handling emit.* as builtin");
394                    }
395                }
396                state.push_egress(payload.clone());
397                Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
398            }
399            NodeKind::Wait => {
400                let reason = extract_wait_reason(&payload);
401                Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
402            }
403        }
404    }
405
406    async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
407        #[derive(Deserialize)]
408        struct FlowCallPayload {
409            #[serde(alias = "flow")]
410            flow_id: String,
411            #[serde(default)]
412            input: Value,
413        }
414
415        let call: FlowCallPayload =
416            serde_json::from_value(payload).context("invalid payload for flow.call node")?;
417        if call.flow_id.trim().is_empty() {
418            bail!("flow.call requires a non-empty flow_id");
419        }
420
421        let sub_input = if call.input.is_null() {
422            Value::Null
423        } else {
424            call.input
425        };
426
427        let flow_id_owned = call.flow_id;
428        let action = "flow.call";
429        let sub_ctx = FlowContext {
430            tenant: ctx.tenant,
431            flow_id: flow_id_owned.as_str(),
432            node_id: None,
433            tool: ctx.tool,
434            action: Some(action),
435            session_id: ctx.session_id,
436            provider_id: ctx.provider_id,
437            retry_config: ctx.retry_config,
438            observer: ctx.observer,
439            mocks: ctx.mocks,
440        };
441
442        let execution = Box::pin(self.execute(sub_ctx, sub_input))
443            .await
444            .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
445        match execution.status {
446            FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
447            FlowStatus::Waiting(wait) => bail!(
448                "flow.call cannot pause (flow {} waiting {:?})",
449                flow_id_owned,
450                wait.reason
451            ),
452        }
453    }
454
455    async fn execute_component_exec(
456        &self,
457        ctx: &FlowContext<'_>,
458        node_id: &str,
459        state: &ExecutionState,
460        payload: Value,
461        component_override: Option<&str>,
462    ) -> Result<NodeOutput> {
463        #[derive(Deserialize)]
464        struct ComponentPayload {
465            #[serde(default, alias = "component_ref", alias = "component")]
466            component: Option<String>,
467            #[serde(alias = "op")]
468            operation: Option<String>,
469            #[serde(default)]
470            input: Value,
471            #[serde(default)]
472            config: Value,
473        }
474
475        let payload: ComponentPayload =
476            serde_json::from_value(payload).context("invalid payload for component.exec")?;
477        let component_ref = component_override
478            .map(str::to_string)
479            .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
480            .with_context(|| "component.exec requires a component_ref")?;
481        let operation = payload
482            .operation
483            .filter(|v| !v.trim().is_empty())
484            .with_context(|| "component.exec requires an operation")?;
485        let mut input = payload.input;
486        if let Value::Object(mut map) = input {
487            map.entry("state".to_string())
488                .or_insert_with(|| state.context());
489            input = Value::Object(map);
490        }
491        let input_json = serde_json::to_string(&input)?;
492        let config_json = if payload.config.is_null() {
493            None
494        } else {
495            Some(serde_json::to_string(&payload.config)?)
496        };
497
498        let pack_idx = *self
499            .flow_sources
500            .get(ctx.flow_id)
501            .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
502        let pack = Arc::clone(&self.packs[pack_idx]);
503        let exec_ctx = component_exec_ctx(ctx, node_id);
504        let value = pack
505            .invoke_component(
506                &component_ref,
507                exec_ctx,
508                &operation,
509                config_json,
510                input_json,
511            )
512            .await?;
513
514        Ok(NodeOutput::new(value))
515    }
516
517    async fn execute_provider_invoke(
518        &self,
519        ctx: &FlowContext<'_>,
520        node_id: &str,
521        state: &ExecutionState,
522        payload: Value,
523    ) -> Result<NodeOutput> {
524        #[derive(Deserialize)]
525        struct ProviderPayload {
526            #[serde(default)]
527            provider_id: Option<String>,
528            #[serde(default)]
529            provider_type: Option<String>,
530            #[serde(default, alias = "operation")]
531            op: Option<String>,
532            #[serde(default)]
533            input: Value,
534            #[serde(default)]
535            in_map: Value,
536            #[serde(default)]
537            out_map: Value,
538            #[serde(default)]
539            err_map: Value,
540        }
541
542        let payload: ProviderPayload =
543            serde_json::from_value(payload).context("invalid payload for provider.invoke")?;
544        let op = payload
545            .op
546            .as_deref()
547            .filter(|v| !v.trim().is_empty())
548            .with_context(|| "provider.invoke requires an op")?
549            .to_string();
550
551        let mut input_value = if !payload.in_map.is_null() {
552            let ctx_value = mapping_ctx(payload.input.clone(), state);
553            apply_mapping(&payload.in_map, &ctx_value)
554                .context("failed to evaluate provider.invoke in_map")?
555        } else if !payload.input.is_null() {
556            payload.input
557        } else {
558            Value::Null
559        };
560
561        if let Value::Object(ref mut map) = input_value {
562            map.entry("state".to_string())
563                .or_insert_with(|| state.context());
564        }
565        let input_json = serde_json::to_vec(&input_value)?;
566
567        let pack_idx = *self
568            .flow_sources
569            .get(ctx.flow_id)
570            .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
571        let pack = Arc::clone(&self.packs[pack_idx]);
572        let binding = pack.resolve_provider(
573            payload.provider_id.as_deref(),
574            payload.provider_type.as_deref(),
575        )?;
576        let exec_ctx = component_exec_ctx(ctx, node_id);
577        let result = pack
578            .invoke_provider(&binding, exec_ctx, &op, input_json)
579            .await?;
580
581        let output = if payload.out_map.is_null() {
582            result
583        } else {
584            let ctx_value = mapping_ctx(result, state);
585            apply_mapping(&payload.out_map, &ctx_value)
586                .context("failed to evaluate provider.invoke out_map")?
587        };
588        let _ = payload.err_map;
589        Ok(NodeOutput::new(output))
590    }
591
592    pub fn flows(&self) -> &[FlowDescriptor] {
593        &self.flows
594    }
595
596    pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
597        self.flows
598            .iter()
599            .find(|descriptor| descriptor.flow_type == flow_type)
600    }
601
602    pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
603        self.flows
604            .iter()
605            .find(|descriptor| descriptor.id == flow_id)
606    }
607}
608
609pub trait ExecutionObserver: Send + Sync {
610    fn on_node_start(&self, event: &NodeEvent<'_>);
611    fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
612    fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
613}
614
615pub struct NodeEvent<'a> {
616    pub context: &'a FlowContext<'a>,
617    pub node_id: &'a str,
618    pub node: &'a HostNode,
619    pub payload: &'a Value,
620}
621
622#[derive(Clone, Debug, Serialize, Deserialize)]
623pub struct ExecutionState {
624    input: Value,
625    nodes: HashMap<String, NodeOutput>,
626    egress: Vec<Value>,
627}
628
629impl ExecutionState {
630    fn new(input: Value) -> Self {
631        Self {
632            input,
633            nodes: HashMap::new(),
634            egress: Vec::new(),
635        }
636    }
637
638    fn context(&self) -> Value {
639        let mut nodes = JsonMap::new();
640        for (id, output) in &self.nodes {
641            nodes.insert(
642                id.clone(),
643                json!({
644                    "ok": output.ok,
645                    "payload": output.payload.clone(),
646                    "meta": output.meta.clone(),
647                }),
648            );
649        }
650        json!({
651            "input": self.input.clone(),
652            "nodes": nodes,
653        })
654    }
655    fn push_egress(&mut self, payload: Value) {
656        self.egress.push(payload);
657    }
658
659    fn replace_input(&mut self, input: Value) {
660        self.input = input;
661    }
662
663    fn clear_egress(&mut self) {
664        self.egress.clear();
665    }
666
667    fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
668        if self.egress.is_empty() {
669            return final_payload.unwrap_or(Value::Null);
670        }
671        let mut emitted = std::mem::take(&mut self.egress);
672        if let Some(value) = final_payload {
673            match value {
674                Value::Null => {}
675                Value::Array(items) => emitted.extend(items),
676                other => emitted.push(other),
677            }
678        }
679        Value::Array(emitted)
680    }
681}
682
683#[derive(Clone, Debug, Serialize, Deserialize)]
684struct NodeOutput {
685    ok: bool,
686    payload: Value,
687    meta: Value,
688}
689
690impl NodeOutput {
691    fn new(payload: Value) -> Self {
692        Self {
693            ok: true,
694            payload,
695            meta: Value::Null,
696        }
697    }
698}
699
700struct DispatchOutcome {
701    output: NodeOutput,
702    wait_reason: Option<String>,
703}
704
705impl DispatchOutcome {
706    fn complete(output: NodeOutput) -> Self {
707        Self {
708            output,
709            wait_reason: None,
710        }
711    }
712
713    fn wait(output: NodeOutput, reason: Option<String>) -> Self {
714        Self {
715            output,
716            wait_reason: reason,
717        }
718    }
719}
720
721fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
722    ComponentExecCtx {
723        tenant: ComponentTenantCtx {
724            tenant: ctx.tenant.to_string(),
725            team: None,
726            user: ctx.provider_id.map(str::to_string),
727            trace_id: None,
728            correlation_id: ctx.session_id.map(str::to_string),
729            deadline_unix_ms: None,
730            attempt: 1,
731            idempotency_key: ctx.session_id.map(str::to_string),
732        },
733        flow_id: ctx.flow_id.to_string(),
734        node_id: Some(node_id.to_string()),
735    }
736}
737
738fn extract_wait_reason(payload: &Value) -> Option<String> {
739    match payload {
740        Value::String(s) => Some(s.clone()),
741        Value::Object(map) => map
742            .get("reason")
743            .and_then(Value::as_str)
744            .map(|value| value.to_string()),
745        _ => None,
746    }
747}
748
749fn mapping_ctx(root: Value, state: &ExecutionState) -> Value {
750    json!({
751        "input": root.clone(),
752        "result": root,
753        "state": state.context(),
754    })
755}
756
757fn apply_mapping(template: &Value, ctx: &Value) -> Result<Value> {
758    match template {
759        Value::String(path) if path.starts_with('/') => ctx
760            .pointer(path)
761            .cloned()
762            .ok_or_else(|| anyhow!("mapping path `{path}` not found")),
763        Value::Array(items) => {
764            let mut mapped = Vec::with_capacity(items.len());
765            for item in items {
766                mapped.push(apply_mapping(item, ctx)?);
767            }
768            Ok(Value::Array(mapped))
769        }
770        Value::Object(map) => {
771            let mut out = serde_json::Map::new();
772            for (key, value) in map {
773                out.insert(key.clone(), apply_mapping(value, ctx)?);
774            }
775            Ok(Value::Object(out))
776        }
777        other => Ok(other.clone()),
778    }
779}
780
781impl From<Flow> for HostFlow {
782    fn from(value: Flow) -> Self {
783        let mut nodes = IndexMap::new();
784        for (id, node) in value.nodes {
785            nodes.insert(id.clone(), HostNode::from(node));
786        }
787        let start = value
788            .entrypoints
789            .get("default")
790            .and_then(Value::as_str)
791            .and_then(|id| NodeId::from_str(id).ok())
792            .or_else(|| nodes.keys().next().cloned());
793        Self {
794            id: value.id.as_str().to_string(),
795            start,
796            nodes,
797        }
798    }
799}
800
801impl From<Node> for HostNode {
802    fn from(node: Node) -> Self {
803        let component_ref = node.component.id.as_str().to_string();
804        let kind = match component_ref.as_str() {
805            "component.exec" => {
806                let target = extract_target_component(&node.input.mapping)
807                    .unwrap_or_else(|| "component.exec".to_string());
808                if target.starts_with("emit.") {
809                    NodeKind::BuiltinEmit {
810                        kind: emit_kind_from_ref(&target),
811                    }
812                } else {
813                    NodeKind::Exec {
814                        target_component: target,
815                    }
816                }
817            }
818            "flow.call" => NodeKind::FlowCall,
819            "provider.invoke" => NodeKind::ProviderInvoke,
820            "session.wait" => NodeKind::Wait,
821            comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
822                kind: emit_kind_from_ref(comp),
823            },
824            other => NodeKind::PackComponent {
825                component_ref: other.to_string(),
826            },
827        };
828        let component_label = match &kind {
829            NodeKind::Exec { .. } => "component.exec".to_string(),
830            NodeKind::PackComponent { component_ref } => component_ref.clone(),
831            NodeKind::ProviderInvoke => "provider.invoke".to_string(),
832            NodeKind::FlowCall => "flow.call".to_string(),
833            NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
834            NodeKind::Wait => "session.wait".to_string(),
835        };
836        let payload_expr = match kind {
837            NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.input.mapping),
838            _ => node.input.mapping.clone(),
839        };
840        Self {
841            kind,
842            component: component_label,
843            payload_expr,
844            routing: node.routing,
845        }
846    }
847}
848
849fn extract_target_component(payload: &Value) -> Option<String> {
850    match payload {
851        Value::Object(map) => map
852            .get("component")
853            .or_else(|| map.get("component_ref"))
854            .and_then(Value::as_str)
855            .map(|s| s.to_string()),
856        _ => None,
857    }
858}
859
860fn extract_emit_payload(payload: &Value) -> Value {
861    if let Value::Object(map) = payload {
862        if let Some(input) = map.get("input") {
863            return input.clone();
864        }
865        if let Some(inner) = map.get("payload") {
866            return inner.clone();
867        }
868    }
869    payload.clone()
870}
871
872fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
873    match component_ref {
874        "emit.log" => EmitKind::Log,
875        "emit.response" => EmitKind::Response,
876        other => EmitKind::Other(other.to_string()),
877    }
878}
879
880fn emit_ref_from_kind(kind: &EmitKind) -> String {
881    match kind {
882        EmitKind::Log => "emit.log".to_string(),
883        EmitKind::Response => "emit.response".to_string(),
884        EmitKind::Other(other) => other.clone(),
885    }
886}
887
888#[cfg(test)]
889mod tests {
890    use super::*;
891    use serde_json::json;
892
893    #[test]
894    fn templating_renders_with_partials_and_data() {
895        let mut state = ExecutionState::new(json!({ "city": "London" }));
896        state.nodes.insert(
897            "forecast".to_string(),
898            NodeOutput::new(json!({ "temp": "20C" })),
899        );
900
901        // templating handled via component now; ensure context still includes node outputs
902        let ctx = state.context();
903        assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
904    }
905
906    #[test]
907    fn finalize_wraps_emitted_payloads() {
908        let mut state = ExecutionState::new(json!({}));
909        state.push_egress(json!({ "text": "first" }));
910        state.push_egress(json!({ "text": "second" }));
911        let result = state.finalize_with(Some(json!({ "text": "final" })));
912        assert_eq!(
913            result,
914            json!([
915                { "text": "first" },
916                { "text": "second" },
917                { "text": "final" }
918            ])
919        );
920    }
921
922    #[test]
923    fn finalize_flattens_final_array() {
924        let mut state = ExecutionState::new(json!({}));
925        state.push_egress(json!({ "text": "only" }));
926        let result = state.finalize_with(Some(json!([
927            { "text": "extra-1" },
928            { "text": "extra-2" }
929        ])));
930        assert_eq!(
931            result,
932            json!([
933                { "text": "only" },
934                { "text": "extra-1" },
935                { "text": "extra-2" }
936            ])
937        );
938    }
939}
940
941use tracing::Instrument;
942
943pub struct FlowContext<'a> {
944    pub tenant: &'a str,
945    pub flow_id: &'a str,
946    pub node_id: Option<&'a str>,
947    pub tool: Option<&'a str>,
948    pub action: Option<&'a str>,
949    pub session_id: Option<&'a str>,
950    pub provider_id: Option<&'a str>,
951    pub retry_config: RetryConfig,
952    pub observer: Option<&'a dyn ExecutionObserver>,
953    pub mocks: Option<&'a MockLayer>,
954}
955
956#[derive(Copy, Clone)]
957pub struct RetryConfig {
958    pub max_attempts: u32,
959    pub base_delay_ms: u64,
960}
961
962fn should_retry(err: &anyhow::Error) -> bool {
963    let lower = err.to_string().to_lowercase();
964    lower.contains("transient") || lower.contains("unavailable") || lower.contains("internal")
965}
966
967impl From<FlowRetryConfig> for RetryConfig {
968    fn from(value: FlowRetryConfig) -> Self {
969        Self {
970            max_attempts: value.max_attempts.max(1),
971            base_delay_ms: value.base_delay_ms.max(50),
972        }
973    }
974}