greentic_runner_host/runner/
engine.rs

1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
9use anyhow::{Context, Result, anyhow, bail};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use crate::config::{FlowRetryConfig, HostConfig};
18use crate::pack::{FlowDescriptor, PackRuntime};
19use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
20use greentic_types::{Flow, Node, NodeId, Routing};
21
22pub struct FlowEngine {
23    packs: Vec<Arc<PackRuntime>>,
24    flows: Vec<FlowDescriptor>,
25    flow_sources: HashMap<String, usize>,
26    flow_cache: RwLock<HashMap<String, HostFlow>>,
27    default_env: String,
28}
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
31pub struct FlowSnapshot {
32    pub flow_id: String,
33    pub next_node: String,
34    pub state: ExecutionState,
35}
36
37#[derive(Clone, Debug)]
38pub struct FlowWait {
39    pub reason: Option<String>,
40    pub snapshot: FlowSnapshot,
41}
42
43#[derive(Clone, Debug)]
44pub enum FlowStatus {
45    Completed,
46    Waiting(FlowWait),
47}
48
49#[derive(Clone, Debug)]
50pub struct FlowExecution {
51    pub output: Value,
52    pub status: FlowStatus,
53}
54
55#[derive(Clone, Debug)]
56struct HostFlow {
57    id: String,
58    start: Option<NodeId>,
59    nodes: IndexMap<NodeId, HostNode>,
60}
61
62#[derive(Clone, Debug)]
63pub struct HostNode {
64    kind: NodeKind,
65    /// Backwards-compatible component label for observers/transcript.
66    pub component: String,
67    payload_expr: Value,
68    routing: Routing,
69}
70
71#[derive(Clone, Debug)]
72enum NodeKind {
73    Exec { target_component: String },
74    PackComponent { component_ref: String },
75    FlowCall,
76    BuiltinEmit { kind: EmitKind },
77    Wait,
78}
79
80#[derive(Clone, Debug)]
81enum EmitKind {
82    Log,
83    Response,
84    Other(String),
85}
86
87impl FlowExecution {
88    fn completed(output: Value) -> Self {
89        Self {
90            output,
91            status: FlowStatus::Completed,
92        }
93    }
94
95    fn waiting(output: Value, wait: FlowWait) -> Self {
96        Self {
97            output,
98            status: FlowStatus::Waiting(wait),
99        }
100    }
101}
102
103impl FlowEngine {
104    pub async fn new(packs: Vec<Arc<PackRuntime>>, _config: Arc<HostConfig>) -> Result<Self> {
105        let mut flow_sources = HashMap::new();
106        let mut descriptors = Vec::new();
107        for (idx, pack) in packs.iter().enumerate() {
108            let flows = pack.list_flows().await?;
109            for flow in flows {
110                tracing::info!(
111                    flow_id = %flow.id,
112                    flow_type = %flow.flow_type,
113                    pack_index = idx,
114                    "registered flow"
115                );
116                flow_sources.insert(flow.id.clone(), idx);
117                descriptors.retain(|existing: &FlowDescriptor| existing.id != flow.id);
118                descriptors.push(flow);
119            }
120        }
121
122        let mut flow_map = HashMap::new();
123        for flow in &descriptors {
124            if let Some(&pack_idx) = flow_sources.get(&flow.id) {
125                let pack_clone = Arc::clone(&packs[pack_idx]);
126                let flow_id = flow.id.clone();
127                let task_flow_id = flow_id.clone();
128                match task::spawn_blocking(move || pack_clone.load_flow(&task_flow_id)).await {
129                    Ok(Ok(flow)) => {
130                        flow_map.insert(flow_id, HostFlow::from(flow));
131                    }
132                    Ok(Err(err)) => {
133                        tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
134                    }
135                    Err(err) => {
136                        tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
137                    }
138                }
139            }
140        }
141
142        Ok(Self {
143            packs,
144            flows: descriptors,
145            flow_sources,
146            flow_cache: RwLock::new(flow_map),
147            default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
148        })
149    }
150
151    async fn get_or_load_flow(&self, flow_id: &str) -> Result<HostFlow> {
152        if let Some(flow) = self.flow_cache.read().get(flow_id).cloned() {
153            return Ok(flow);
154        }
155
156        let pack_idx = *self
157            .flow_sources
158            .get(flow_id)
159            .with_context(|| format!("flow {flow_id} not registered"))?;
160        let pack = Arc::clone(&self.packs[pack_idx]);
161        let flow_id_owned = flow_id.to_string();
162        let task_flow_id = flow_id_owned.clone();
163        let flow = task::spawn_blocking(move || pack.load_flow(&task_flow_id))
164            .await
165            .context("failed to join flow metadata task")??;
166        let host_flow = HostFlow::from(flow);
167        self.flow_cache
168            .write()
169            .insert(flow_id_owned.clone(), host_flow.clone());
170        Ok(host_flow)
171    }
172
173    pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
174        let span = tracing::info_span!(
175            "flow.execute",
176            tenant = tracing::field::Empty,
177            flow_id = tracing::field::Empty,
178            node_id = tracing::field::Empty,
179            tool = tracing::field::Empty,
180            action = tracing::field::Empty
181        );
182        annotate_span(
183            &span,
184            &FlowSpanAttributes {
185                tenant: ctx.tenant,
186                flow_id: ctx.flow_id,
187                node_id: ctx.node_id,
188                tool: ctx.tool,
189                action: ctx.action,
190            },
191        );
192        set_flow_context(
193            &self.default_env,
194            ctx.tenant,
195            ctx.flow_id,
196            ctx.node_id,
197            ctx.provider_id,
198            ctx.session_id,
199        );
200        let retry_config = ctx.retry_config;
201        let original_input = input;
202        async move {
203            let mut attempt = 0u32;
204            loop {
205                attempt += 1;
206                match self.execute_once(&ctx, original_input.clone()).await {
207                    Ok(value) => return Ok(value),
208                    Err(err) => {
209                        if attempt >= retry_config.max_attempts || !should_retry(&err) {
210                            return Err(err);
211                        }
212                        let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
213                        tracing::warn!(
214                            tenant = ctx.tenant,
215                            flow_id = ctx.flow_id,
216                            attempt,
217                            max_attempts = retry_config.max_attempts,
218                            delay_ms = delay,
219                            error = %err,
220                            "transient flow execution failure, backing off"
221                        );
222                        tokio::time::sleep(Duration::from_millis(delay)).await;
223                    }
224                }
225            }
226        }
227        .instrument(span)
228        .await
229    }
230
231    pub async fn resume(
232        &self,
233        ctx: FlowContext<'_>,
234        snapshot: FlowSnapshot,
235        input: Value,
236    ) -> Result<FlowExecution> {
237        if snapshot.flow_id != ctx.flow_id {
238            bail!(
239                "snapshot flow {} does not match requested {}",
240                snapshot.flow_id,
241                ctx.flow_id
242            );
243        }
244        let flow_ir = self.get_or_load_flow(ctx.flow_id).await?;
245        let mut state = snapshot.state;
246        state.replace_input(input);
247        self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
248            .await
249    }
250
251    async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
252        let flow_ir = self.get_or_load_flow(ctx.flow_id).await?;
253        let state = ExecutionState::new(input);
254        self.drive_flow(ctx, flow_ir, state, None).await
255    }
256
257    async fn drive_flow(
258        &self,
259        ctx: &FlowContext<'_>,
260        flow_ir: HostFlow,
261        mut state: ExecutionState,
262        resume_from: Option<String>,
263    ) -> Result<FlowExecution> {
264        let mut current = match resume_from {
265            Some(node) => NodeId::from_str(&node)
266                .with_context(|| format!("invalid resume node id `{node}`"))?,
267            None => flow_ir
268                .start
269                .clone()
270                .or_else(|| flow_ir.nodes.keys().next().cloned())
271                .with_context(|| format!("flow {} has no start node", flow_ir.id))?,
272        };
273
274        loop {
275            let node = flow_ir
276                .nodes
277                .get(&current)
278                .with_context(|| format!("node {} not found", current.as_str()))?;
279
280            let payload = node.payload_expr.clone();
281            let observed_payload = payload.clone();
282            let node_id = current.clone();
283            let event = NodeEvent {
284                context: ctx,
285                node_id: node_id.as_str(),
286                node,
287                payload: &observed_payload,
288            };
289            if let Some(observer) = ctx.observer {
290                observer.on_node_start(&event);
291            }
292            let DispatchOutcome {
293                output,
294                wait_reason,
295            } = self
296                .dispatch_node(ctx, node_id.as_str(), node, &mut state, payload)
297                .await?;
298
299            state.nodes.insert(node_id.clone().into(), output.clone());
300
301            let (next, should_exit) = match &node.routing {
302                Routing::Next { node_id } => (Some(node_id.clone()), false),
303                Routing::End | Routing::Reply => (None, true),
304                Routing::Branch { default, .. } => (default.clone(), default.is_none()),
305                Routing::Custom(raw) => {
306                    tracing::warn!(
307                        flow_id = %flow_ir.id,
308                        node_id = %node_id,
309                        routing = ?raw,
310                        "unsupported routing; terminating flow"
311                    );
312                    (None, true)
313                }
314            };
315
316            if let Some(wait_reason) = wait_reason {
317                let resume_target = next.clone().ok_or_else(|| {
318                    anyhow!(
319                        "session.wait node {} requires a non-empty route",
320                        current.as_str()
321                    )
322                })?;
323                let mut snapshot_state = state.clone();
324                snapshot_state.clear_egress();
325                let snapshot = FlowSnapshot {
326                    flow_id: ctx.flow_id.to_string(),
327                    next_node: resume_target.as_str().to_string(),
328                    state: snapshot_state,
329                };
330                let output_value = state.clone().finalize_with(None);
331                return Ok(FlowExecution::waiting(
332                    output_value,
333                    FlowWait {
334                        reason: Some(wait_reason),
335                        snapshot,
336                    },
337                ));
338            }
339
340            if should_exit {
341                return Ok(FlowExecution::completed(
342                    state.finalize_with(Some(output.payload.clone())),
343                ));
344            }
345
346            match next {
347                Some(n) => current = n,
348                None => {
349                    return Ok(FlowExecution::completed(
350                        state.finalize_with(Some(output.payload.clone())),
351                    ));
352                }
353            }
354        }
355    }
356
357    async fn dispatch_node(
358        &self,
359        ctx: &FlowContext<'_>,
360        node_id: &str,
361        node: &HostNode,
362        state: &mut ExecutionState,
363        payload: Value,
364    ) -> Result<DispatchOutcome> {
365        match &node.kind {
366            NodeKind::Exec { target_component } => self
367                .execute_component_exec(
368                    ctx,
369                    node_id,
370                    state,
371                    payload,
372                    Some(target_component.as_str()),
373                )
374                .await
375                .map(DispatchOutcome::complete),
376            NodeKind::PackComponent { component_ref } => self
377                .execute_component_exec(ctx, node_id, state, payload, Some(component_ref.as_str()))
378                .await
379                .map(DispatchOutcome::complete),
380            NodeKind::FlowCall => self
381                .execute_flow_call(ctx, payload)
382                .await
383                .map(DispatchOutcome::complete),
384            NodeKind::BuiltinEmit { kind } => {
385                match kind {
386                    EmitKind::Log | EmitKind::Response => {}
387                    EmitKind::Other(component) => {
388                        tracing::debug!(%component, "handling emit.* as builtin");
389                    }
390                }
391                state.push_egress(payload.clone());
392                Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
393            }
394            NodeKind::Wait => {
395                let reason = extract_wait_reason(&payload);
396                Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
397            }
398        }
399    }
400
401    async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
402        #[derive(Deserialize)]
403        struct FlowCallPayload {
404            #[serde(alias = "flow")]
405            flow_id: String,
406            #[serde(default)]
407            input: Value,
408        }
409
410        let call: FlowCallPayload =
411            serde_json::from_value(payload).context("invalid payload for flow.call node")?;
412        if call.flow_id.trim().is_empty() {
413            bail!("flow.call requires a non-empty flow_id");
414        }
415
416        let sub_input = if call.input.is_null() {
417            Value::Null
418        } else {
419            call.input
420        };
421
422        let flow_id_owned = call.flow_id;
423        let action = "flow.call";
424        let sub_ctx = FlowContext {
425            tenant: ctx.tenant,
426            flow_id: flow_id_owned.as_str(),
427            node_id: None,
428            tool: ctx.tool,
429            action: Some(action),
430            session_id: ctx.session_id,
431            provider_id: ctx.provider_id,
432            retry_config: ctx.retry_config,
433            observer: ctx.observer,
434            mocks: ctx.mocks,
435        };
436
437        let execution = Box::pin(self.execute(sub_ctx, sub_input))
438            .await
439            .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
440        match execution.status {
441            FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
442            FlowStatus::Waiting(wait) => bail!(
443                "flow.call cannot pause (flow {} waiting {:?})",
444                flow_id_owned,
445                wait.reason
446            ),
447        }
448    }
449
450    async fn execute_component_exec(
451        &self,
452        ctx: &FlowContext<'_>,
453        node_id: &str,
454        state: &ExecutionState,
455        payload: Value,
456        component_override: Option<&str>,
457    ) -> Result<NodeOutput> {
458        #[derive(Deserialize)]
459        struct ComponentPayload {
460            #[serde(default, alias = "component_ref", alias = "component")]
461            component: Option<String>,
462            #[serde(alias = "op")]
463            operation: Option<String>,
464            #[serde(default)]
465            input: Value,
466            #[serde(default)]
467            config: Value,
468        }
469
470        let payload: ComponentPayload =
471            serde_json::from_value(payload).context("invalid payload for component.exec")?;
472        let component_ref = component_override
473            .map(str::to_string)
474            .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
475            .with_context(|| "component.exec requires a component_ref")?;
476        let operation = payload
477            .operation
478            .filter(|v| !v.trim().is_empty())
479            .with_context(|| "component.exec requires an operation")?;
480        let mut input = payload.input;
481        if let Value::Object(mut map) = input {
482            map.entry("state".to_string())
483                .or_insert_with(|| state.context());
484            input = Value::Object(map);
485        }
486        let input_json = serde_json::to_string(&input)?;
487        let config_json = if payload.config.is_null() {
488            None
489        } else {
490            Some(serde_json::to_string(&payload.config)?)
491        };
492
493        let pack_idx = *self
494            .flow_sources
495            .get(ctx.flow_id)
496            .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
497        let pack = Arc::clone(&self.packs[pack_idx]);
498        let exec_ctx = component_exec_ctx(ctx, node_id);
499        let value = pack
500            .invoke_component(
501                &component_ref,
502                exec_ctx,
503                &operation,
504                config_json,
505                input_json,
506            )
507            .await?;
508
509        Ok(NodeOutput::new(value))
510    }
511
512    pub fn flows(&self) -> &[FlowDescriptor] {
513        &self.flows
514    }
515
516    pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
517        self.flows
518            .iter()
519            .find(|descriptor| descriptor.flow_type == flow_type)
520    }
521
522    pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
523        self.flows
524            .iter()
525            .find(|descriptor| descriptor.id == flow_id)
526    }
527}
528
529pub trait ExecutionObserver: Send + Sync {
530    fn on_node_start(&self, event: &NodeEvent<'_>);
531    fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
532    fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
533}
534
535pub struct NodeEvent<'a> {
536    pub context: &'a FlowContext<'a>,
537    pub node_id: &'a str,
538    pub node: &'a HostNode,
539    pub payload: &'a Value,
540}
541
542#[derive(Clone, Debug, Serialize, Deserialize)]
543pub struct ExecutionState {
544    input: Value,
545    nodes: HashMap<String, NodeOutput>,
546    egress: Vec<Value>,
547}
548
549impl ExecutionState {
550    fn new(input: Value) -> Self {
551        Self {
552            input,
553            nodes: HashMap::new(),
554            egress: Vec::new(),
555        }
556    }
557
558    fn context(&self) -> Value {
559        let mut nodes = JsonMap::new();
560        for (id, output) in &self.nodes {
561            nodes.insert(
562                id.clone(),
563                json!({
564                    "ok": output.ok,
565                    "payload": output.payload.clone(),
566                    "meta": output.meta.clone(),
567                }),
568            );
569        }
570        json!({
571            "input": self.input.clone(),
572            "nodes": nodes,
573        })
574    }
575    fn push_egress(&mut self, payload: Value) {
576        self.egress.push(payload);
577    }
578
579    fn replace_input(&mut self, input: Value) {
580        self.input = input;
581    }
582
583    fn clear_egress(&mut self) {
584        self.egress.clear();
585    }
586
587    fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
588        if self.egress.is_empty() {
589            return final_payload.unwrap_or(Value::Null);
590        }
591        let mut emitted = std::mem::take(&mut self.egress);
592        if let Some(value) = final_payload {
593            match value {
594                Value::Null => {}
595                Value::Array(items) => emitted.extend(items),
596                other => emitted.push(other),
597            }
598        }
599        Value::Array(emitted)
600    }
601}
602
603#[derive(Clone, Debug, Serialize, Deserialize)]
604struct NodeOutput {
605    ok: bool,
606    payload: Value,
607    meta: Value,
608}
609
610impl NodeOutput {
611    fn new(payload: Value) -> Self {
612        Self {
613            ok: true,
614            payload,
615            meta: Value::Null,
616        }
617    }
618}
619
620struct DispatchOutcome {
621    output: NodeOutput,
622    wait_reason: Option<String>,
623}
624
625impl DispatchOutcome {
626    fn complete(output: NodeOutput) -> Self {
627        Self {
628            output,
629            wait_reason: None,
630        }
631    }
632
633    fn wait(output: NodeOutput, reason: Option<String>) -> Self {
634        Self {
635            output,
636            wait_reason: reason,
637        }
638    }
639}
640
641fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
642    ComponentExecCtx {
643        tenant: ComponentTenantCtx {
644            tenant: ctx.tenant.to_string(),
645            team: None,
646            user: ctx.provider_id.map(str::to_string),
647            trace_id: None,
648            correlation_id: ctx.session_id.map(str::to_string),
649            deadline_unix_ms: None,
650            attempt: 1,
651            idempotency_key: ctx.session_id.map(str::to_string),
652        },
653        flow_id: ctx.flow_id.to_string(),
654        node_id: Some(node_id.to_string()),
655    }
656}
657
658fn extract_wait_reason(payload: &Value) -> Option<String> {
659    match payload {
660        Value::String(s) => Some(s.clone()),
661        Value::Object(map) => map
662            .get("reason")
663            .and_then(Value::as_str)
664            .map(|value| value.to_string()),
665        _ => None,
666    }
667}
668
669impl From<Flow> for HostFlow {
670    fn from(value: Flow) -> Self {
671        let mut nodes = IndexMap::new();
672        for (id, node) in value.nodes {
673            nodes.insert(id.clone(), HostNode::from(node));
674        }
675        let start = value
676            .entrypoints
677            .get("default")
678            .and_then(Value::as_str)
679            .and_then(|id| NodeId::from_str(id).ok())
680            .or_else(|| nodes.keys().next().cloned());
681        Self {
682            id: value.id.as_str().to_string(),
683            start,
684            nodes,
685        }
686    }
687}
688
689impl From<Node> for HostNode {
690    fn from(node: Node) -> Self {
691        let component_ref = node.component.id.as_str().to_string();
692        let kind = match component_ref.as_str() {
693            "component.exec" => {
694                let target = extract_target_component(&node.input.mapping)
695                    .unwrap_or_else(|| "component.exec".to_string());
696                if target.starts_with("emit.") {
697                    NodeKind::BuiltinEmit {
698                        kind: emit_kind_from_ref(&target),
699                    }
700                } else {
701                    NodeKind::Exec {
702                        target_component: target,
703                    }
704                }
705            }
706            "flow.call" => NodeKind::FlowCall,
707            "session.wait" => NodeKind::Wait,
708            comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
709                kind: emit_kind_from_ref(comp),
710            },
711            other => NodeKind::PackComponent {
712                component_ref: other.to_string(),
713            },
714        };
715        let component_label = match &kind {
716            NodeKind::Exec { .. } => "component.exec".to_string(),
717            NodeKind::PackComponent { component_ref } => component_ref.clone(),
718            NodeKind::FlowCall => "flow.call".to_string(),
719            NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
720            NodeKind::Wait => "session.wait".to_string(),
721        };
722        let payload_expr = match kind {
723            NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.input.mapping),
724            _ => node.input.mapping.clone(),
725        };
726        Self {
727            kind,
728            component: component_label,
729            payload_expr,
730            routing: node.routing,
731        }
732    }
733}
734
735fn extract_target_component(payload: &Value) -> Option<String> {
736    match payload {
737        Value::Object(map) => map
738            .get("component")
739            .or_else(|| map.get("component_ref"))
740            .and_then(Value::as_str)
741            .map(|s| s.to_string()),
742        _ => None,
743    }
744}
745
746fn extract_emit_payload(payload: &Value) -> Value {
747    if let Value::Object(map) = payload {
748        if let Some(input) = map.get("input") {
749            return input.clone();
750        }
751        if let Some(inner) = map.get("payload") {
752            return inner.clone();
753        }
754    }
755    payload.clone()
756}
757
758fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
759    match component_ref {
760        "emit.log" => EmitKind::Log,
761        "emit.response" => EmitKind::Response,
762        other => EmitKind::Other(other.to_string()),
763    }
764}
765
766fn emit_ref_from_kind(kind: &EmitKind) -> String {
767    match kind {
768        EmitKind::Log => "emit.log".to_string(),
769        EmitKind::Response => "emit.response".to_string(),
770        EmitKind::Other(other) => other.clone(),
771    }
772}
773
774#[cfg(test)]
775mod tests {
776    use super::*;
777    use serde_json::json;
778
779    #[test]
780    fn templating_renders_with_partials_and_data() {
781        let mut state = ExecutionState::new(json!({ "city": "London" }));
782        state.nodes.insert(
783            "forecast".to_string(),
784            NodeOutput::new(json!({ "temp": "20C" })),
785        );
786
787        // templating handled via component now; ensure context still includes node outputs
788        let ctx = state.context();
789        assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
790    }
791
792    #[test]
793    fn finalize_wraps_emitted_payloads() {
794        let mut state = ExecutionState::new(json!({}));
795        state.push_egress(json!({ "text": "first" }));
796        state.push_egress(json!({ "text": "second" }));
797        let result = state.finalize_with(Some(json!({ "text": "final" })));
798        assert_eq!(
799            result,
800            json!([
801                { "text": "first" },
802                { "text": "second" },
803                { "text": "final" }
804            ])
805        );
806    }
807
808    #[test]
809    fn finalize_flattens_final_array() {
810        let mut state = ExecutionState::new(json!({}));
811        state.push_egress(json!({ "text": "only" }));
812        let result = state.finalize_with(Some(json!([
813            { "text": "extra-1" },
814            { "text": "extra-2" }
815        ])));
816        assert_eq!(
817            result,
818            json!([
819                { "text": "only" },
820                { "text": "extra-1" },
821                { "text": "extra-2" }
822            ])
823        );
824    }
825}
826
827use tracing::Instrument;
828
829pub struct FlowContext<'a> {
830    pub tenant: &'a str,
831    pub flow_id: &'a str,
832    pub node_id: Option<&'a str>,
833    pub tool: Option<&'a str>,
834    pub action: Option<&'a str>,
835    pub session_id: Option<&'a str>,
836    pub provider_id: Option<&'a str>,
837    pub retry_config: RetryConfig,
838    pub observer: Option<&'a dyn ExecutionObserver>,
839    pub mocks: Option<&'a MockLayer>,
840}
841
842#[derive(Copy, Clone)]
843pub struct RetryConfig {
844    pub max_attempts: u32,
845    pub base_delay_ms: u64,
846}
847
848fn should_retry(err: &anyhow::Error) -> bool {
849    let lower = err.to_string().to_lowercase();
850    lower.contains("transient") || lower.contains("unavailable") || lower.contains("internal")
851}
852
853impl From<FlowRetryConfig> for RetryConfig {
854    fn from(value: FlowRetryConfig) -> Self {
855        Self {
856            max_attempts: value.max_attempts.max(1),
857            base_delay_ms: value.base_delay_ms.max(50),
858        }
859    }
860}