greentic_runner_host/runner/
engine.rs

1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::sync::Arc;
5use std::time::Duration;
6
7use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
8use anyhow::{Context, Result, anyhow, bail};
9use greentic_flow::ir::{FlowIR, NodeIR, RouteIR};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use crate::config::{FlowRetryConfig, HostConfig};
18use crate::pack::{FlowDescriptor, PackRuntime};
19use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
20
21pub struct FlowEngine {
22    packs: Vec<Arc<PackRuntime>>,
23    flows: Vec<FlowDescriptor>,
24    flow_sources: HashMap<String, usize>,
25    flow_ir: RwLock<HashMap<String, HostFlowIR>>,
26    default_env: String,
27}
28
29#[derive(Clone, Debug, Serialize, Deserialize)]
30pub struct FlowSnapshot {
31    pub flow_id: String,
32    pub next_node: String,
33    pub state: ExecutionState,
34}
35
36#[derive(Clone, Debug)]
37pub struct FlowWait {
38    pub reason: Option<String>,
39    pub snapshot: FlowSnapshot,
40}
41
42#[derive(Clone, Debug)]
43pub enum FlowStatus {
44    Completed,
45    Waiting(FlowWait),
46}
47
48#[derive(Clone, Debug)]
49pub struct FlowExecution {
50    pub output: Value,
51    pub status: FlowStatus,
52}
53
54#[derive(Clone, Debug)]
55#[allow(dead_code)]
56struct HostFlowIR {
57    id: String,
58    flow_type: String,
59    start: Option<String>,
60    parameters: Value,
61    nodes: IndexMap<String, HostNode>,
62}
63
64#[derive(Clone, Debug)]
65pub struct HostNode {
66    kind: NodeKind,
67    /// Backwards-compatible component label for observers/transcript.
68    pub component: String,
69    payload_expr: Value,
70    routes: Vec<RouteIR>,
71}
72
73#[derive(Clone, Debug)]
74enum NodeKind {
75    Exec { target_component: String },
76    PackComponent { component_ref: String },
77    FlowCall,
78    BuiltinEmit { kind: EmitKind },
79    Wait,
80}
81
82#[derive(Clone, Debug)]
83enum EmitKind {
84    Log,
85    Response,
86    Other(String),
87}
88
89impl FlowExecution {
90    fn completed(output: Value) -> Self {
91        Self {
92            output,
93            status: FlowStatus::Completed,
94        }
95    }
96
97    fn waiting(output: Value, wait: FlowWait) -> Self {
98        Self {
99            output,
100            status: FlowStatus::Waiting(wait),
101        }
102    }
103}
104
105impl FlowEngine {
106    pub async fn new(packs: Vec<Arc<PackRuntime>>, _config: Arc<HostConfig>) -> Result<Self> {
107        let mut flow_sources = HashMap::new();
108        let mut descriptors = Vec::new();
109        for (idx, pack) in packs.iter().enumerate() {
110            let flows = pack.list_flows().await?;
111            for flow in flows {
112                tracing::info!(
113                    flow_id = %flow.id,
114                    flow_type = %flow.flow_type,
115                    pack_index = idx,
116                    "registered flow"
117                );
118                flow_sources.insert(flow.id.clone(), idx);
119                descriptors.retain(|existing: &FlowDescriptor| existing.id != flow.id);
120                descriptors.push(flow);
121            }
122        }
123
124        let mut ir_map = HashMap::new();
125        for flow in &descriptors {
126            if let Some(&pack_idx) = flow_sources.get(&flow.id) {
127                let pack_clone = Arc::clone(&packs[pack_idx]);
128                let flow_id = flow.id.clone();
129                let task_flow_id = flow_id.clone();
130                match task::spawn_blocking(move || pack_clone.load_flow_ir(&task_flow_id)).await {
131                    Ok(Ok(ir)) => {
132                        ir_map.insert(flow_id, HostFlowIR::from(ir));
133                    }
134                    Ok(Err(err)) => {
135                        tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
136                    }
137                    Err(err) => {
138                        tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
139                    }
140                }
141            }
142        }
143
144        Ok(Self {
145            packs,
146            flows: descriptors,
147            flow_sources,
148            flow_ir: RwLock::new(ir_map),
149            default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
150        })
151    }
152
153    async fn get_or_load_flow_ir(&self, flow_id: &str) -> Result<HostFlowIR> {
154        if let Some(ir) = self.flow_ir.read().get(flow_id).cloned() {
155            return Ok(ir);
156        }
157
158        let pack_idx = *self
159            .flow_sources
160            .get(flow_id)
161            .with_context(|| format!("flow {flow_id} not registered"))?;
162        let pack = Arc::clone(&self.packs[pack_idx]);
163        let flow_id_owned = flow_id.to_string();
164        let task_flow_id = flow_id_owned.clone();
165        let ir = task::spawn_blocking(move || pack.load_flow_ir(&task_flow_id))
166            .await
167            .context("failed to join flow metadata task")??;
168        let host_ir = HostFlowIR::from(ir);
169        self.flow_ir
170            .write()
171            .insert(flow_id_owned.clone(), host_ir.clone());
172        Ok(host_ir)
173    }
174
175    pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
176        let span = tracing::info_span!(
177            "flow.execute",
178            tenant = tracing::field::Empty,
179            flow_id = tracing::field::Empty,
180            node_id = tracing::field::Empty,
181            tool = tracing::field::Empty,
182            action = tracing::field::Empty
183        );
184        annotate_span(
185            &span,
186            &FlowSpanAttributes {
187                tenant: ctx.tenant,
188                flow_id: ctx.flow_id,
189                node_id: ctx.node_id,
190                tool: ctx.tool,
191                action: ctx.action,
192            },
193        );
194        set_flow_context(
195            &self.default_env,
196            ctx.tenant,
197            ctx.flow_id,
198            ctx.node_id,
199            ctx.provider_id,
200            ctx.session_id,
201        );
202        let retry_config = ctx.retry_config;
203        let original_input = input;
204        async move {
205            let mut attempt = 0u32;
206            loop {
207                attempt += 1;
208                match self.execute_once(&ctx, original_input.clone()).await {
209                    Ok(value) => return Ok(value),
210                    Err(err) => {
211                        if attempt >= retry_config.max_attempts || !should_retry(&err) {
212                            return Err(err);
213                        }
214                        let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
215                        tracing::warn!(
216                            tenant = ctx.tenant,
217                            flow_id = ctx.flow_id,
218                            attempt,
219                            max_attempts = retry_config.max_attempts,
220                            delay_ms = delay,
221                            error = %err,
222                            "transient flow execution failure, backing off"
223                        );
224                        tokio::time::sleep(Duration::from_millis(delay)).await;
225                    }
226                }
227            }
228        }
229        .instrument(span)
230        .await
231    }
232
233    pub async fn resume(
234        &self,
235        ctx: FlowContext<'_>,
236        snapshot: FlowSnapshot,
237        input: Value,
238    ) -> Result<FlowExecution> {
239        if snapshot.flow_id != ctx.flow_id {
240            bail!(
241                "snapshot flow {} does not match requested {}",
242                snapshot.flow_id,
243                ctx.flow_id
244            );
245        }
246        let flow_ir = self.get_or_load_flow_ir(ctx.flow_id).await?;
247        let mut state = snapshot.state;
248        state.replace_input(input);
249        self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
250            .await
251    }
252
253    async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
254        let flow_ir = self.get_or_load_flow_ir(ctx.flow_id).await?;
255        let state = ExecutionState::new(input);
256        self.drive_flow(ctx, flow_ir, state, None).await
257    }
258
259    async fn drive_flow(
260        &self,
261        ctx: &FlowContext<'_>,
262        flow_ir: HostFlowIR,
263        mut state: ExecutionState,
264        resume_from: Option<String>,
265    ) -> Result<FlowExecution> {
266        let mut current = flow_ir
267            .start
268            .clone()
269            .or_else(|| flow_ir.nodes.keys().next().cloned())
270            .with_context(|| format!("flow {} has no start node", flow_ir.id))?;
271        if let Some(resume) = resume_from {
272            current = resume;
273        }
274        let mut final_payload = None;
275
276        loop {
277            let node = flow_ir
278                .nodes
279                .get(&current)
280                .with_context(|| format!("node {current} not found"))?;
281
282            let payload = node.payload_expr.clone();
283            let observed_payload = payload.clone();
284            let node_id = current.clone();
285            let event = NodeEvent {
286                context: ctx,
287                node_id: &node_id,
288                node,
289                payload: &observed_payload,
290            };
291            if let Some(observer) = ctx.observer {
292                observer.on_node_start(&event);
293            }
294            let DispatchOutcome {
295                output,
296                wait_reason,
297            } = self
298                .dispatch_node(ctx, &current, node, &mut state, payload)
299                .await?;
300
301            state.nodes.insert(current.clone(), output.clone());
302
303            let mut next = None;
304            let mut should_exit = false;
305            for route in &node.routes {
306                if route.out || matches!(route.to.as_deref(), Some("out")) {
307                    final_payload = Some(output.payload.clone());
308                    should_exit = true;
309                    break;
310                }
311                if let Some(to) = &route.to {
312                    next = Some(to.clone());
313                    break;
314                }
315            }
316
317            if let Some(wait_reason) = wait_reason {
318                let resume_target = next.clone().ok_or_else(|| {
319                    anyhow!("session.wait node {current} requires a non-empty route")
320                })?;
321                let mut snapshot_state = state.clone();
322                snapshot_state.clear_egress();
323                let snapshot = FlowSnapshot {
324                    flow_id: ctx.flow_id.to_string(),
325                    next_node: resume_target,
326                    state: snapshot_state,
327                };
328                let output_value = state.clone().finalize_with(None);
329                return Ok(FlowExecution::waiting(
330                    output_value,
331                    FlowWait {
332                        reason: Some(wait_reason),
333                        snapshot,
334                    },
335                ));
336            }
337
338            if should_exit {
339                break;
340            }
341
342            match next {
343                Some(n) => current = n,
344                None => {
345                    final_payload = Some(output.payload.clone());
346                    break;
347                }
348            }
349        }
350
351        let payload = final_payload.unwrap_or(Value::Null);
352        Ok(FlowExecution::completed(state.finalize_with(Some(payload))))
353    }
354
355    async fn dispatch_node(
356        &self,
357        ctx: &FlowContext<'_>,
358        node_id: &str,
359        node: &HostNode,
360        state: &mut ExecutionState,
361        payload: Value,
362    ) -> Result<DispatchOutcome> {
363        match &node.kind {
364            NodeKind::Exec { target_component } => self
365                .execute_component_exec(
366                    ctx,
367                    node_id,
368                    state,
369                    payload,
370                    Some(target_component.as_str()),
371                )
372                .await
373                .map(DispatchOutcome::complete),
374            NodeKind::PackComponent { component_ref } => self
375                .execute_component_exec(ctx, node_id, state, payload, Some(component_ref.as_str()))
376                .await
377                .map(DispatchOutcome::complete),
378            NodeKind::FlowCall => self
379                .execute_flow_call(ctx, payload)
380                .await
381                .map(DispatchOutcome::complete),
382            NodeKind::BuiltinEmit { kind } => {
383                match kind {
384                    EmitKind::Log | EmitKind::Response => {}
385                    EmitKind::Other(component) => {
386                        tracing::debug!(%component, "handling emit.* as builtin");
387                    }
388                }
389                state.push_egress(payload.clone());
390                Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
391            }
392            NodeKind::Wait => {
393                let reason = extract_wait_reason(&payload);
394                Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
395            }
396        }
397    }
398
399    async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
400        #[derive(Deserialize)]
401        struct FlowCallPayload {
402            #[serde(alias = "flow")]
403            flow_id: String,
404            #[serde(default)]
405            input: Value,
406        }
407
408        let call: FlowCallPayload =
409            serde_json::from_value(payload).context("invalid payload for flow.call node")?;
410        if call.flow_id.trim().is_empty() {
411            bail!("flow.call requires a non-empty flow_id");
412        }
413
414        let sub_input = if call.input.is_null() {
415            Value::Null
416        } else {
417            call.input
418        };
419
420        let flow_id_owned = call.flow_id;
421        let action = "flow.call";
422        let sub_ctx = FlowContext {
423            tenant: ctx.tenant,
424            flow_id: flow_id_owned.as_str(),
425            node_id: None,
426            tool: ctx.tool,
427            action: Some(action),
428            session_id: ctx.session_id,
429            provider_id: ctx.provider_id,
430            retry_config: ctx.retry_config,
431            observer: ctx.observer,
432            mocks: ctx.mocks,
433        };
434
435        let execution = Box::pin(self.execute(sub_ctx, sub_input))
436            .await
437            .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
438        match execution.status {
439            FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
440            FlowStatus::Waiting(wait) => bail!(
441                "flow.call cannot pause (flow {} waiting {:?})",
442                flow_id_owned,
443                wait.reason
444            ),
445        }
446    }
447
448    async fn execute_component_exec(
449        &self,
450        ctx: &FlowContext<'_>,
451        node_id: &str,
452        state: &ExecutionState,
453        payload: Value,
454        component_override: Option<&str>,
455    ) -> Result<NodeOutput> {
456        #[derive(Deserialize)]
457        struct ComponentPayload {
458            #[serde(default, alias = "component_ref", alias = "component")]
459            component: Option<String>,
460            #[serde(alias = "op")]
461            operation: Option<String>,
462            #[serde(default)]
463            input: Value,
464            #[serde(default)]
465            config: Value,
466        }
467
468        let payload: ComponentPayload =
469            serde_json::from_value(payload).context("invalid payload for component.exec")?;
470        let component_ref = component_override
471            .map(str::to_string)
472            .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
473            .with_context(|| "component.exec requires a component_ref")?;
474        let operation = payload
475            .operation
476            .filter(|v| !v.trim().is_empty())
477            .with_context(|| "component.exec requires an operation")?;
478        let mut input = payload.input;
479        if let Value::Object(mut map) = input {
480            map.entry("state".to_string())
481                .or_insert_with(|| state.context());
482            input = Value::Object(map);
483        }
484        let input_json = serde_json::to_string(&input)?;
485        let config_json = if payload.config.is_null() {
486            None
487        } else {
488            Some(serde_json::to_string(&payload.config)?)
489        };
490
491        let pack_idx = *self
492            .flow_sources
493            .get(ctx.flow_id)
494            .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
495        let pack = Arc::clone(&self.packs[pack_idx]);
496        let exec_ctx = component_exec_ctx(ctx, node_id);
497        let value = pack
498            .invoke_component(
499                &component_ref,
500                exec_ctx,
501                &operation,
502                config_json,
503                input_json,
504            )
505            .await?;
506
507        Ok(NodeOutput::new(value))
508    }
509
510    pub fn flows(&self) -> &[FlowDescriptor] {
511        &self.flows
512    }
513
514    pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
515        self.flows
516            .iter()
517            .find(|descriptor| descriptor.flow_type == flow_type)
518    }
519
520    pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
521        self.flows
522            .iter()
523            .find(|descriptor| descriptor.id == flow_id)
524    }
525}
526
527pub trait ExecutionObserver: Send + Sync {
528    fn on_node_start(&self, event: &NodeEvent<'_>);
529    fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
530    fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
531}
532
533pub struct NodeEvent<'a> {
534    pub context: &'a FlowContext<'a>,
535    pub node_id: &'a str,
536    pub node: &'a HostNode,
537    pub payload: &'a Value,
538}
539
540#[derive(Clone, Debug, Serialize, Deserialize)]
541pub struct ExecutionState {
542    input: Value,
543    nodes: HashMap<String, NodeOutput>,
544    egress: Vec<Value>,
545}
546
547impl ExecutionState {
548    fn new(input: Value) -> Self {
549        Self {
550            input,
551            nodes: HashMap::new(),
552            egress: Vec::new(),
553        }
554    }
555
556    fn context(&self) -> Value {
557        let mut nodes = JsonMap::new();
558        for (id, output) in &self.nodes {
559            nodes.insert(
560                id.clone(),
561                json!({
562                    "ok": output.ok,
563                    "payload": output.payload.clone(),
564                    "meta": output.meta.clone(),
565                }),
566            );
567        }
568        json!({
569            "input": self.input.clone(),
570            "nodes": nodes,
571        })
572    }
573    fn push_egress(&mut self, payload: Value) {
574        self.egress.push(payload);
575    }
576
577    fn replace_input(&mut self, input: Value) {
578        self.input = input;
579    }
580
581    fn clear_egress(&mut self) {
582        self.egress.clear();
583    }
584
585    fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
586        if self.egress.is_empty() {
587            return final_payload.unwrap_or(Value::Null);
588        }
589        let mut emitted = std::mem::take(&mut self.egress);
590        if let Some(value) = final_payload {
591            match value {
592                Value::Null => {}
593                Value::Array(items) => emitted.extend(items),
594                other => emitted.push(other),
595            }
596        }
597        Value::Array(emitted)
598    }
599}
600
601#[derive(Clone, Debug, Serialize, Deserialize)]
602struct NodeOutput {
603    ok: bool,
604    payload: Value,
605    meta: Value,
606}
607
608impl NodeOutput {
609    fn new(payload: Value) -> Self {
610        Self {
611            ok: true,
612            payload,
613            meta: Value::Null,
614        }
615    }
616}
617
618struct DispatchOutcome {
619    output: NodeOutput,
620    wait_reason: Option<String>,
621}
622
623impl DispatchOutcome {
624    fn complete(output: NodeOutput) -> Self {
625        Self {
626            output,
627            wait_reason: None,
628        }
629    }
630
631    fn wait(output: NodeOutput, reason: Option<String>) -> Self {
632        Self {
633            output,
634            wait_reason: reason,
635        }
636    }
637}
638
639fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
640    ComponentExecCtx {
641        tenant: ComponentTenantCtx {
642            tenant: ctx.tenant.to_string(),
643            team: None,
644            user: ctx.provider_id.map(str::to_string),
645            trace_id: None,
646            correlation_id: ctx.session_id.map(str::to_string),
647            deadline_unix_ms: None,
648            attempt: 1,
649            idempotency_key: ctx.session_id.map(str::to_string),
650        },
651        flow_id: ctx.flow_id.to_string(),
652        node_id: Some(node_id.to_string()),
653    }
654}
655
656fn extract_wait_reason(payload: &Value) -> Option<String> {
657    match payload {
658        Value::String(s) => Some(s.clone()),
659        Value::Object(map) => map
660            .get("reason")
661            .and_then(Value::as_str)
662            .map(|value| value.to_string()),
663        _ => None,
664    }
665}
666
667impl From<FlowIR> for HostFlowIR {
668    fn from(value: FlowIR) -> Self {
669        let nodes = value
670            .nodes
671            .into_iter()
672            .map(|(id, node)| (id, HostNode::from(node)))
673            .collect();
674        Self {
675            id: value.id,
676            flow_type: value.flow_type,
677            start: value.start,
678            parameters: value.parameters,
679            nodes,
680        }
681    }
682}
683
684impl From<NodeIR> for HostNode {
685    fn from(node: NodeIR) -> Self {
686        let kind = match node.component.as_str() {
687            "component.exec" => {
688                let target = extract_target_component(&node.payload_expr)
689                    .unwrap_or_else(|| "component.exec".to_string());
690                if target.starts_with("emit.") {
691                    NodeKind::BuiltinEmit {
692                        kind: emit_kind_from_ref(&target),
693                    }
694                } else {
695                    NodeKind::Exec {
696                        target_component: target,
697                    }
698                }
699            }
700            "flow.call" => NodeKind::FlowCall,
701            "session.wait" => NodeKind::Wait,
702            comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
703                kind: emit_kind_from_ref(comp),
704            },
705            other => NodeKind::PackComponent {
706                component_ref: other.to_string(),
707            },
708        };
709        let component_label = match &kind {
710            NodeKind::Exec { .. } => "component.exec".to_string(),
711            NodeKind::PackComponent { component_ref } => component_ref.clone(),
712            NodeKind::FlowCall => "flow.call".to_string(),
713            NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
714            NodeKind::Wait => "session.wait".to_string(),
715        };
716        let payload_expr = match kind {
717            NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.payload_expr),
718            _ => node.payload_expr.clone(),
719        };
720        Self {
721            kind,
722            component: component_label,
723            payload_expr,
724            routes: node.routes,
725        }
726    }
727}
728
729fn extract_target_component(payload: &Value) -> Option<String> {
730    match payload {
731        Value::Object(map) => map
732            .get("component")
733            .or_else(|| map.get("component_ref"))
734            .and_then(Value::as_str)
735            .map(|s| s.to_string()),
736        _ => None,
737    }
738}
739
740fn extract_emit_payload(payload: &Value) -> Value {
741    if let Value::Object(map) = payload {
742        if let Some(input) = map.get("input") {
743            return input.clone();
744        }
745        if let Some(inner) = map.get("payload") {
746            return inner.clone();
747        }
748    }
749    payload.clone()
750}
751
752fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
753    match component_ref {
754        "emit.log" => EmitKind::Log,
755        "emit.response" => EmitKind::Response,
756        other => EmitKind::Other(other.to_string()),
757    }
758}
759
760fn emit_ref_from_kind(kind: &EmitKind) -> String {
761    match kind {
762        EmitKind::Log => "emit.log".to_string(),
763        EmitKind::Response => "emit.response".to_string(),
764        EmitKind::Other(other) => other.clone(),
765    }
766}
767
768#[cfg(test)]
769mod tests {
770    use super::*;
771    use serde_json::json;
772
773    #[test]
774    fn templating_renders_with_partials_and_data() {
775        let mut state = ExecutionState::new(json!({ "city": "London" }));
776        state.nodes.insert(
777            "forecast".to_string(),
778            NodeOutput::new(json!({ "temp": "20C" })),
779        );
780
781        // templating handled via component now; ensure context still includes node outputs
782        let ctx = state.context();
783        assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
784    }
785
786    #[test]
787    fn finalize_wraps_emitted_payloads() {
788        let mut state = ExecutionState::new(json!({}));
789        state.push_egress(json!({ "text": "first" }));
790        state.push_egress(json!({ "text": "second" }));
791        let result = state.finalize_with(Some(json!({ "text": "final" })));
792        assert_eq!(
793            result,
794            json!([
795                { "text": "first" },
796                { "text": "second" },
797                { "text": "final" }
798            ])
799        );
800    }
801
802    #[test]
803    fn finalize_flattens_final_array() {
804        let mut state = ExecutionState::new(json!({}));
805        state.push_egress(json!({ "text": "only" }));
806        let result = state.finalize_with(Some(json!([
807            { "text": "extra-1" },
808            { "text": "extra-2" }
809        ])));
810        assert_eq!(
811            result,
812            json!([
813                { "text": "only" },
814                { "text": "extra-1" },
815                { "text": "extra-2" }
816            ])
817        );
818    }
819}
820
821use tracing::Instrument;
822
823pub struct FlowContext<'a> {
824    pub tenant: &'a str,
825    pub flow_id: &'a str,
826    pub node_id: Option<&'a str>,
827    pub tool: Option<&'a str>,
828    pub action: Option<&'a str>,
829    pub session_id: Option<&'a str>,
830    pub provider_id: Option<&'a str>,
831    pub retry_config: RetryConfig,
832    pub observer: Option<&'a dyn ExecutionObserver>,
833    pub mocks: Option<&'a MockLayer>,
834}
835
836#[derive(Copy, Clone)]
837pub struct RetryConfig {
838    pub max_attempts: u32,
839    pub base_delay_ms: u64,
840}
841
842fn should_retry(err: &anyhow::Error) -> bool {
843    let lower = err.to_string().to_lowercase();
844    lower.contains("transient") || lower.contains("unavailable") || lower.contains("internal")
845}
846
847impl From<FlowRetryConfig> for RetryConfig {
848    fn from(value: FlowRetryConfig) -> Self {
849        Self {
850            max_attempts: value.max_attempts.max(1),
851            base_delay_ms: value.base_delay_ms.max(50),
852        }
853    }
854}