1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
9use anyhow::{Context, Result, anyhow, bail};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use crate::config::{FlowRetryConfig, HostConfig};
18use crate::pack::{FlowDescriptor, PackRuntime};
19use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
20use greentic_types::{Flow, Node, NodeId, Routing};
21
22pub struct FlowEngine {
23 packs: Vec<Arc<PackRuntime>>,
24 flows: Vec<FlowDescriptor>,
25 flow_sources: HashMap<String, usize>,
26 flow_cache: RwLock<HashMap<String, HostFlow>>,
27 default_env: String,
28}
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
31pub struct FlowSnapshot {
32 pub flow_id: String,
33 pub next_node: String,
34 pub state: ExecutionState,
35}
36
37#[derive(Clone, Debug)]
38pub struct FlowWait {
39 pub reason: Option<String>,
40 pub snapshot: FlowSnapshot,
41}
42
43#[derive(Clone, Debug)]
44pub enum FlowStatus {
45 Completed,
46 Waiting(FlowWait),
47}
48
49#[derive(Clone, Debug)]
50pub struct FlowExecution {
51 pub output: Value,
52 pub status: FlowStatus,
53}
54
55#[derive(Clone, Debug)]
56struct HostFlow {
57 id: String,
58 start: Option<NodeId>,
59 nodes: IndexMap<NodeId, HostNode>,
60}
61
62#[derive(Clone, Debug)]
63pub struct HostNode {
64 kind: NodeKind,
65 pub component: String,
67 payload_expr: Value,
68 routing: Routing,
69}
70
71#[derive(Clone, Debug)]
72enum NodeKind {
73 Exec { target_component: String },
74 PackComponent { component_ref: String },
75 FlowCall,
76 BuiltinEmit { kind: EmitKind },
77 Wait,
78}
79
80#[derive(Clone, Debug)]
81enum EmitKind {
82 Log,
83 Response,
84 Other(String),
85}
86
87impl FlowExecution {
88 fn completed(output: Value) -> Self {
89 Self {
90 output,
91 status: FlowStatus::Completed,
92 }
93 }
94
95 fn waiting(output: Value, wait: FlowWait) -> Self {
96 Self {
97 output,
98 status: FlowStatus::Waiting(wait),
99 }
100 }
101}
102
103impl FlowEngine {
104 pub async fn new(packs: Vec<Arc<PackRuntime>>, _config: Arc<HostConfig>) -> Result<Self> {
105 let mut flow_sources = HashMap::new();
106 let mut descriptors = Vec::new();
107 for (idx, pack) in packs.iter().enumerate() {
108 let flows = pack.list_flows().await?;
109 for flow in flows {
110 tracing::info!(
111 flow_id = %flow.id,
112 flow_type = %flow.flow_type,
113 pack_index = idx,
114 "registered flow"
115 );
116 flow_sources.insert(flow.id.clone(), idx);
117 descriptors.retain(|existing: &FlowDescriptor| existing.id != flow.id);
118 descriptors.push(flow);
119 }
120 }
121
122 let mut flow_map = HashMap::new();
123 for flow in &descriptors {
124 if let Some(&pack_idx) = flow_sources.get(&flow.id) {
125 let pack_clone = Arc::clone(&packs[pack_idx]);
126 let flow_id = flow.id.clone();
127 let task_flow_id = flow_id.clone();
128 match task::spawn_blocking(move || pack_clone.load_flow(&task_flow_id)).await {
129 Ok(Ok(flow)) => {
130 flow_map.insert(flow_id, HostFlow::from(flow));
131 }
132 Ok(Err(err)) => {
133 tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
134 }
135 Err(err) => {
136 tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
137 }
138 }
139 }
140 }
141
142 Ok(Self {
143 packs,
144 flows: descriptors,
145 flow_sources,
146 flow_cache: RwLock::new(flow_map),
147 default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
148 })
149 }
150
151 async fn get_or_load_flow(&self, flow_id: &str) -> Result<HostFlow> {
152 if let Some(flow) = self.flow_cache.read().get(flow_id).cloned() {
153 return Ok(flow);
154 }
155
156 let pack_idx = *self
157 .flow_sources
158 .get(flow_id)
159 .with_context(|| format!("flow {flow_id} not registered"))?;
160 let pack = Arc::clone(&self.packs[pack_idx]);
161 let flow_id_owned = flow_id.to_string();
162 let task_flow_id = flow_id_owned.clone();
163 let flow = task::spawn_blocking(move || pack.load_flow(&task_flow_id))
164 .await
165 .context("failed to join flow metadata task")??;
166 let host_flow = HostFlow::from(flow);
167 self.flow_cache
168 .write()
169 .insert(flow_id_owned.clone(), host_flow.clone());
170 Ok(host_flow)
171 }
172
173 pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
174 let span = tracing::info_span!(
175 "flow.execute",
176 tenant = tracing::field::Empty,
177 flow_id = tracing::field::Empty,
178 node_id = tracing::field::Empty,
179 tool = tracing::field::Empty,
180 action = tracing::field::Empty
181 );
182 annotate_span(
183 &span,
184 &FlowSpanAttributes {
185 tenant: ctx.tenant,
186 flow_id: ctx.flow_id,
187 node_id: ctx.node_id,
188 tool: ctx.tool,
189 action: ctx.action,
190 },
191 );
192 set_flow_context(
193 &self.default_env,
194 ctx.tenant,
195 ctx.flow_id,
196 ctx.node_id,
197 ctx.provider_id,
198 ctx.session_id,
199 );
200 let retry_config = ctx.retry_config;
201 let original_input = input;
202 async move {
203 let mut attempt = 0u32;
204 loop {
205 attempt += 1;
206 match self.execute_once(&ctx, original_input.clone()).await {
207 Ok(value) => return Ok(value),
208 Err(err) => {
209 if attempt >= retry_config.max_attempts || !should_retry(&err) {
210 return Err(err);
211 }
212 let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
213 tracing::warn!(
214 tenant = ctx.tenant,
215 flow_id = ctx.flow_id,
216 attempt,
217 max_attempts = retry_config.max_attempts,
218 delay_ms = delay,
219 error = %err,
220 "transient flow execution failure, backing off"
221 );
222 tokio::time::sleep(Duration::from_millis(delay)).await;
223 }
224 }
225 }
226 }
227 .instrument(span)
228 .await
229 }
230
231 pub async fn resume(
232 &self,
233 ctx: FlowContext<'_>,
234 snapshot: FlowSnapshot,
235 input: Value,
236 ) -> Result<FlowExecution> {
237 if snapshot.flow_id != ctx.flow_id {
238 bail!(
239 "snapshot flow {} does not match requested {}",
240 snapshot.flow_id,
241 ctx.flow_id
242 );
243 }
244 let flow_ir = self.get_or_load_flow(ctx.flow_id).await?;
245 let mut state = snapshot.state;
246 state.replace_input(input);
247 self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
248 .await
249 }
250
251 async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
252 let flow_ir = self.get_or_load_flow(ctx.flow_id).await?;
253 let state = ExecutionState::new(input);
254 self.drive_flow(ctx, flow_ir, state, None).await
255 }
256
257 async fn drive_flow(
258 &self,
259 ctx: &FlowContext<'_>,
260 flow_ir: HostFlow,
261 mut state: ExecutionState,
262 resume_from: Option<String>,
263 ) -> Result<FlowExecution> {
264 let mut current = match resume_from {
265 Some(node) => NodeId::from_str(&node)
266 .with_context(|| format!("invalid resume node id `{node}`"))?,
267 None => flow_ir
268 .start
269 .clone()
270 .or_else(|| flow_ir.nodes.keys().next().cloned())
271 .with_context(|| format!("flow {} has no start node", flow_ir.id))?,
272 };
273
274 loop {
275 let node = flow_ir
276 .nodes
277 .get(¤t)
278 .with_context(|| format!("node {} not found", current.as_str()))?;
279
280 let payload = node.payload_expr.clone();
281 let observed_payload = payload.clone();
282 let node_id = current.clone();
283 let event = NodeEvent {
284 context: ctx,
285 node_id: node_id.as_str(),
286 node,
287 payload: &observed_payload,
288 };
289 if let Some(observer) = ctx.observer {
290 observer.on_node_start(&event);
291 }
292 let DispatchOutcome {
293 output,
294 wait_reason,
295 } = self
296 .dispatch_node(ctx, node_id.as_str(), node, &mut state, payload)
297 .await?;
298
299 state.nodes.insert(node_id.clone().into(), output.clone());
300
301 let (next, should_exit) = match &node.routing {
302 Routing::Next { node_id } => (Some(node_id.clone()), false),
303 Routing::End | Routing::Reply => (None, true),
304 Routing::Branch { default, .. } => (default.clone(), default.is_none()),
305 Routing::Custom(raw) => {
306 tracing::warn!(
307 flow_id = %flow_ir.id,
308 node_id = %node_id,
309 routing = ?raw,
310 "unsupported routing; terminating flow"
311 );
312 (None, true)
313 }
314 };
315
316 if let Some(wait_reason) = wait_reason {
317 let resume_target = next.clone().ok_or_else(|| {
318 anyhow!(
319 "session.wait node {} requires a non-empty route",
320 current.as_str()
321 )
322 })?;
323 let mut snapshot_state = state.clone();
324 snapshot_state.clear_egress();
325 let snapshot = FlowSnapshot {
326 flow_id: ctx.flow_id.to_string(),
327 next_node: resume_target.as_str().to_string(),
328 state: snapshot_state,
329 };
330 let output_value = state.clone().finalize_with(None);
331 return Ok(FlowExecution::waiting(
332 output_value,
333 FlowWait {
334 reason: Some(wait_reason),
335 snapshot,
336 },
337 ));
338 }
339
340 if should_exit {
341 return Ok(FlowExecution::completed(
342 state.finalize_with(Some(output.payload.clone())),
343 ));
344 }
345
346 match next {
347 Some(n) => current = n,
348 None => {
349 return Ok(FlowExecution::completed(
350 state.finalize_with(Some(output.payload.clone())),
351 ));
352 }
353 }
354 }
355 }
356
357 async fn dispatch_node(
358 &self,
359 ctx: &FlowContext<'_>,
360 node_id: &str,
361 node: &HostNode,
362 state: &mut ExecutionState,
363 payload: Value,
364 ) -> Result<DispatchOutcome> {
365 match &node.kind {
366 NodeKind::Exec { target_component } => self
367 .execute_component_exec(
368 ctx,
369 node_id,
370 state,
371 payload,
372 Some(target_component.as_str()),
373 )
374 .await
375 .map(DispatchOutcome::complete),
376 NodeKind::PackComponent { component_ref } => self
377 .execute_component_exec(ctx, node_id, state, payload, Some(component_ref.as_str()))
378 .await
379 .map(DispatchOutcome::complete),
380 NodeKind::FlowCall => self
381 .execute_flow_call(ctx, payload)
382 .await
383 .map(DispatchOutcome::complete),
384 NodeKind::BuiltinEmit { kind } => {
385 match kind {
386 EmitKind::Log | EmitKind::Response => {}
387 EmitKind::Other(component) => {
388 tracing::debug!(%component, "handling emit.* as builtin");
389 }
390 }
391 state.push_egress(payload.clone());
392 Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
393 }
394 NodeKind::Wait => {
395 let reason = extract_wait_reason(&payload);
396 Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
397 }
398 }
399 }
400
401 async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
402 #[derive(Deserialize)]
403 struct FlowCallPayload {
404 #[serde(alias = "flow")]
405 flow_id: String,
406 #[serde(default)]
407 input: Value,
408 }
409
410 let call: FlowCallPayload =
411 serde_json::from_value(payload).context("invalid payload for flow.call node")?;
412 if call.flow_id.trim().is_empty() {
413 bail!("flow.call requires a non-empty flow_id");
414 }
415
416 let sub_input = if call.input.is_null() {
417 Value::Null
418 } else {
419 call.input
420 };
421
422 let flow_id_owned = call.flow_id;
423 let action = "flow.call";
424 let sub_ctx = FlowContext {
425 tenant: ctx.tenant,
426 flow_id: flow_id_owned.as_str(),
427 node_id: None,
428 tool: ctx.tool,
429 action: Some(action),
430 session_id: ctx.session_id,
431 provider_id: ctx.provider_id,
432 retry_config: ctx.retry_config,
433 observer: ctx.observer,
434 mocks: ctx.mocks,
435 };
436
437 let execution = Box::pin(self.execute(sub_ctx, sub_input))
438 .await
439 .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
440 match execution.status {
441 FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
442 FlowStatus::Waiting(wait) => bail!(
443 "flow.call cannot pause (flow {} waiting {:?})",
444 flow_id_owned,
445 wait.reason
446 ),
447 }
448 }
449
450 async fn execute_component_exec(
451 &self,
452 ctx: &FlowContext<'_>,
453 node_id: &str,
454 state: &ExecutionState,
455 payload: Value,
456 component_override: Option<&str>,
457 ) -> Result<NodeOutput> {
458 #[derive(Deserialize)]
459 struct ComponentPayload {
460 #[serde(default, alias = "component_ref", alias = "component")]
461 component: Option<String>,
462 #[serde(alias = "op")]
463 operation: Option<String>,
464 #[serde(default)]
465 input: Value,
466 #[serde(default)]
467 config: Value,
468 }
469
470 let payload: ComponentPayload =
471 serde_json::from_value(payload).context("invalid payload for component.exec")?;
472 let component_ref = component_override
473 .map(str::to_string)
474 .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
475 .with_context(|| "component.exec requires a component_ref")?;
476 let operation = payload
477 .operation
478 .filter(|v| !v.trim().is_empty())
479 .with_context(|| "component.exec requires an operation")?;
480 let mut input = payload.input;
481 if let Value::Object(mut map) = input {
482 map.entry("state".to_string())
483 .or_insert_with(|| state.context());
484 input = Value::Object(map);
485 }
486 let input_json = serde_json::to_string(&input)?;
487 let config_json = if payload.config.is_null() {
488 None
489 } else {
490 Some(serde_json::to_string(&payload.config)?)
491 };
492
493 let pack_idx = *self
494 .flow_sources
495 .get(ctx.flow_id)
496 .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
497 let pack = Arc::clone(&self.packs[pack_idx]);
498 let exec_ctx = component_exec_ctx(ctx, node_id);
499 let value = pack
500 .invoke_component(
501 &component_ref,
502 exec_ctx,
503 &operation,
504 config_json,
505 input_json,
506 )
507 .await?;
508
509 Ok(NodeOutput::new(value))
510 }
511
512 pub fn flows(&self) -> &[FlowDescriptor] {
513 &self.flows
514 }
515
516 pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
517 self.flows
518 .iter()
519 .find(|descriptor| descriptor.flow_type == flow_type)
520 }
521
522 pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
523 self.flows
524 .iter()
525 .find(|descriptor| descriptor.id == flow_id)
526 }
527}
528
529pub trait ExecutionObserver: Send + Sync {
530 fn on_node_start(&self, event: &NodeEvent<'_>);
531 fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
532 fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
533}
534
535pub struct NodeEvent<'a> {
536 pub context: &'a FlowContext<'a>,
537 pub node_id: &'a str,
538 pub node: &'a HostNode,
539 pub payload: &'a Value,
540}
541
542#[derive(Clone, Debug, Serialize, Deserialize)]
543pub struct ExecutionState {
544 input: Value,
545 nodes: HashMap<String, NodeOutput>,
546 egress: Vec<Value>,
547}
548
549impl ExecutionState {
550 fn new(input: Value) -> Self {
551 Self {
552 input,
553 nodes: HashMap::new(),
554 egress: Vec::new(),
555 }
556 }
557
558 fn context(&self) -> Value {
559 let mut nodes = JsonMap::new();
560 for (id, output) in &self.nodes {
561 nodes.insert(
562 id.clone(),
563 json!({
564 "ok": output.ok,
565 "payload": output.payload.clone(),
566 "meta": output.meta.clone(),
567 }),
568 );
569 }
570 json!({
571 "input": self.input.clone(),
572 "nodes": nodes,
573 })
574 }
575 fn push_egress(&mut self, payload: Value) {
576 self.egress.push(payload);
577 }
578
579 fn replace_input(&mut self, input: Value) {
580 self.input = input;
581 }
582
583 fn clear_egress(&mut self) {
584 self.egress.clear();
585 }
586
587 fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
588 if self.egress.is_empty() {
589 return final_payload.unwrap_or(Value::Null);
590 }
591 let mut emitted = std::mem::take(&mut self.egress);
592 if let Some(value) = final_payload {
593 match value {
594 Value::Null => {}
595 Value::Array(items) => emitted.extend(items),
596 other => emitted.push(other),
597 }
598 }
599 Value::Array(emitted)
600 }
601}
602
603#[derive(Clone, Debug, Serialize, Deserialize)]
604struct NodeOutput {
605 ok: bool,
606 payload: Value,
607 meta: Value,
608}
609
610impl NodeOutput {
611 fn new(payload: Value) -> Self {
612 Self {
613 ok: true,
614 payload,
615 meta: Value::Null,
616 }
617 }
618}
619
620struct DispatchOutcome {
621 output: NodeOutput,
622 wait_reason: Option<String>,
623}
624
625impl DispatchOutcome {
626 fn complete(output: NodeOutput) -> Self {
627 Self {
628 output,
629 wait_reason: None,
630 }
631 }
632
633 fn wait(output: NodeOutput, reason: Option<String>) -> Self {
634 Self {
635 output,
636 wait_reason: reason,
637 }
638 }
639}
640
641fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
642 ComponentExecCtx {
643 tenant: ComponentTenantCtx {
644 tenant: ctx.tenant.to_string(),
645 team: None,
646 user: ctx.provider_id.map(str::to_string),
647 trace_id: None,
648 correlation_id: ctx.session_id.map(str::to_string),
649 deadline_unix_ms: None,
650 attempt: 1,
651 idempotency_key: ctx.session_id.map(str::to_string),
652 },
653 flow_id: ctx.flow_id.to_string(),
654 node_id: Some(node_id.to_string()),
655 }
656}
657
658fn extract_wait_reason(payload: &Value) -> Option<String> {
659 match payload {
660 Value::String(s) => Some(s.clone()),
661 Value::Object(map) => map
662 .get("reason")
663 .and_then(Value::as_str)
664 .map(|value| value.to_string()),
665 _ => None,
666 }
667}
668
669impl From<Flow> for HostFlow {
670 fn from(value: Flow) -> Self {
671 let mut nodes = IndexMap::new();
672 for (id, node) in value.nodes {
673 nodes.insert(id.clone(), HostNode::from(node));
674 }
675 let start = value
676 .entrypoints
677 .get("default")
678 .and_then(Value::as_str)
679 .and_then(|id| NodeId::from_str(id).ok())
680 .or_else(|| nodes.keys().next().cloned());
681 Self {
682 id: value.id.as_str().to_string(),
683 start,
684 nodes,
685 }
686 }
687}
688
689impl From<Node> for HostNode {
690 fn from(node: Node) -> Self {
691 let component_ref = node.component.id.as_str().to_string();
692 let kind = match component_ref.as_str() {
693 "component.exec" => {
694 let target = extract_target_component(&node.input.mapping)
695 .unwrap_or_else(|| "component.exec".to_string());
696 if target.starts_with("emit.") {
697 NodeKind::BuiltinEmit {
698 kind: emit_kind_from_ref(&target),
699 }
700 } else {
701 NodeKind::Exec {
702 target_component: target,
703 }
704 }
705 }
706 "flow.call" => NodeKind::FlowCall,
707 "session.wait" => NodeKind::Wait,
708 comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
709 kind: emit_kind_from_ref(comp),
710 },
711 other => NodeKind::PackComponent {
712 component_ref: other.to_string(),
713 },
714 };
715 let component_label = match &kind {
716 NodeKind::Exec { .. } => "component.exec".to_string(),
717 NodeKind::PackComponent { component_ref } => component_ref.clone(),
718 NodeKind::FlowCall => "flow.call".to_string(),
719 NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
720 NodeKind::Wait => "session.wait".to_string(),
721 };
722 let payload_expr = match kind {
723 NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.input.mapping),
724 _ => node.input.mapping.clone(),
725 };
726 Self {
727 kind,
728 component: component_label,
729 payload_expr,
730 routing: node.routing,
731 }
732 }
733}
734
735fn extract_target_component(payload: &Value) -> Option<String> {
736 match payload {
737 Value::Object(map) => map
738 .get("component")
739 .or_else(|| map.get("component_ref"))
740 .and_then(Value::as_str)
741 .map(|s| s.to_string()),
742 _ => None,
743 }
744}
745
746fn extract_emit_payload(payload: &Value) -> Value {
747 if let Value::Object(map) = payload {
748 if let Some(input) = map.get("input") {
749 return input.clone();
750 }
751 if let Some(inner) = map.get("payload") {
752 return inner.clone();
753 }
754 }
755 payload.clone()
756}
757
758fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
759 match component_ref {
760 "emit.log" => EmitKind::Log,
761 "emit.response" => EmitKind::Response,
762 other => EmitKind::Other(other.to_string()),
763 }
764}
765
766fn emit_ref_from_kind(kind: &EmitKind) -> String {
767 match kind {
768 EmitKind::Log => "emit.log".to_string(),
769 EmitKind::Response => "emit.response".to_string(),
770 EmitKind::Other(other) => other.clone(),
771 }
772}
773
774#[cfg(test)]
775mod tests {
776 use super::*;
777 use serde_json::json;
778
779 #[test]
780 fn templating_renders_with_partials_and_data() {
781 let mut state = ExecutionState::new(json!({ "city": "London" }));
782 state.nodes.insert(
783 "forecast".to_string(),
784 NodeOutput::new(json!({ "temp": "20C" })),
785 );
786
787 let ctx = state.context();
789 assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
790 }
791
792 #[test]
793 fn finalize_wraps_emitted_payloads() {
794 let mut state = ExecutionState::new(json!({}));
795 state.push_egress(json!({ "text": "first" }));
796 state.push_egress(json!({ "text": "second" }));
797 let result = state.finalize_with(Some(json!({ "text": "final" })));
798 assert_eq!(
799 result,
800 json!([
801 { "text": "first" },
802 { "text": "second" },
803 { "text": "final" }
804 ])
805 );
806 }
807
808 #[test]
809 fn finalize_flattens_final_array() {
810 let mut state = ExecutionState::new(json!({}));
811 state.push_egress(json!({ "text": "only" }));
812 let result = state.finalize_with(Some(json!([
813 { "text": "extra-1" },
814 { "text": "extra-2" }
815 ])));
816 assert_eq!(
817 result,
818 json!([
819 { "text": "only" },
820 { "text": "extra-1" },
821 { "text": "extra-2" }
822 ])
823 );
824 }
825}
826
827use tracing::Instrument;
828
829pub struct FlowContext<'a> {
830 pub tenant: &'a str,
831 pub flow_id: &'a str,
832 pub node_id: Option<&'a str>,
833 pub tool: Option<&'a str>,
834 pub action: Option<&'a str>,
835 pub session_id: Option<&'a str>,
836 pub provider_id: Option<&'a str>,
837 pub retry_config: RetryConfig,
838 pub observer: Option<&'a dyn ExecutionObserver>,
839 pub mocks: Option<&'a MockLayer>,
840}
841
842#[derive(Copy, Clone)]
843pub struct RetryConfig {
844 pub max_attempts: u32,
845 pub base_delay_ms: u64,
846}
847
848fn should_retry(err: &anyhow::Error) -> bool {
849 let lower = err.to_string().to_lowercase();
850 lower.contains("transient") || lower.contains("unavailable") || lower.contains("internal")
851}
852
853impl From<FlowRetryConfig> for RetryConfig {
854 fn from(value: FlowRetryConfig) -> Self {
855 Self {
856 max_attempts: value.max_attempts.max(1),
857 base_delay_ms: value.base_delay_ms.max(50),
858 }
859 }
860}