1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::sync::Arc;
5use std::time::Duration;
6
7use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
8use anyhow::{Context, Result, anyhow, bail};
9use greentic_flow::ir::{FlowIR, NodeIR, RouteIR};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use crate::config::{FlowRetryConfig, HostConfig};
18use crate::pack::{FlowDescriptor, PackRuntime};
19use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
20
21pub struct FlowEngine {
22 packs: Vec<Arc<PackRuntime>>,
23 flows: Vec<FlowDescriptor>,
24 flow_sources: HashMap<String, usize>,
25 flow_ir: RwLock<HashMap<String, HostFlowIR>>,
26 default_env: String,
27}
28
29#[derive(Clone, Debug, Serialize, Deserialize)]
30pub struct FlowSnapshot {
31 pub flow_id: String,
32 pub next_node: String,
33 pub state: ExecutionState,
34}
35
36#[derive(Clone, Debug)]
37pub struct FlowWait {
38 pub reason: Option<String>,
39 pub snapshot: FlowSnapshot,
40}
41
42#[derive(Clone, Debug)]
43pub enum FlowStatus {
44 Completed,
45 Waiting(FlowWait),
46}
47
48#[derive(Clone, Debug)]
49pub struct FlowExecution {
50 pub output: Value,
51 pub status: FlowStatus,
52}
53
54#[derive(Clone, Debug)]
55#[allow(dead_code)]
56struct HostFlowIR {
57 id: String,
58 flow_type: String,
59 start: Option<String>,
60 parameters: Value,
61 nodes: IndexMap<String, HostNode>,
62}
63
64#[derive(Clone, Debug)]
65pub struct HostNode {
66 kind: NodeKind,
67 pub component: String,
69 payload_expr: Value,
70 routes: Vec<RouteIR>,
71}
72
73#[derive(Clone, Debug)]
74enum NodeKind {
75 Exec { target_component: String },
76 PackComponent { component_ref: String },
77 FlowCall,
78 BuiltinEmit { kind: EmitKind },
79 Wait,
80}
81
82#[derive(Clone, Debug)]
83enum EmitKind {
84 Log,
85 Response,
86 Other(String),
87}
88
89impl FlowExecution {
90 fn completed(output: Value) -> Self {
91 Self {
92 output,
93 status: FlowStatus::Completed,
94 }
95 }
96
97 fn waiting(output: Value, wait: FlowWait) -> Self {
98 Self {
99 output,
100 status: FlowStatus::Waiting(wait),
101 }
102 }
103}
104
105impl FlowEngine {
106 pub async fn new(packs: Vec<Arc<PackRuntime>>, _config: Arc<HostConfig>) -> Result<Self> {
107 let mut flow_sources = HashMap::new();
108 let mut descriptors = Vec::new();
109 for (idx, pack) in packs.iter().enumerate() {
110 let flows = pack.list_flows().await?;
111 for flow in flows {
112 tracing::info!(
113 flow_id = %flow.id,
114 flow_type = %flow.flow_type,
115 pack_index = idx,
116 "registered flow"
117 );
118 flow_sources.insert(flow.id.clone(), idx);
119 descriptors.retain(|existing: &FlowDescriptor| existing.id != flow.id);
120 descriptors.push(flow);
121 }
122 }
123
124 let mut ir_map = HashMap::new();
125 for flow in &descriptors {
126 if let Some(&pack_idx) = flow_sources.get(&flow.id) {
127 let pack_clone = Arc::clone(&packs[pack_idx]);
128 let flow_id = flow.id.clone();
129 let task_flow_id = flow_id.clone();
130 match task::spawn_blocking(move || pack_clone.load_flow_ir(&task_flow_id)).await {
131 Ok(Ok(ir)) => {
132 ir_map.insert(flow_id, HostFlowIR::from(ir));
133 }
134 Ok(Err(err)) => {
135 tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
136 }
137 Err(err) => {
138 tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
139 }
140 }
141 }
142 }
143
144 Ok(Self {
145 packs,
146 flows: descriptors,
147 flow_sources,
148 flow_ir: RwLock::new(ir_map),
149 default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
150 })
151 }
152
153 async fn get_or_load_flow_ir(&self, flow_id: &str) -> Result<HostFlowIR> {
154 if let Some(ir) = self.flow_ir.read().get(flow_id).cloned() {
155 return Ok(ir);
156 }
157
158 let pack_idx = *self
159 .flow_sources
160 .get(flow_id)
161 .with_context(|| format!("flow {flow_id} not registered"))?;
162 let pack = Arc::clone(&self.packs[pack_idx]);
163 let flow_id_owned = flow_id.to_string();
164 let task_flow_id = flow_id_owned.clone();
165 let ir = task::spawn_blocking(move || pack.load_flow_ir(&task_flow_id))
166 .await
167 .context("failed to join flow metadata task")??;
168 let host_ir = HostFlowIR::from(ir);
169 self.flow_ir
170 .write()
171 .insert(flow_id_owned.clone(), host_ir.clone());
172 Ok(host_ir)
173 }
174
175 pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
176 let span = tracing::info_span!(
177 "flow.execute",
178 tenant = tracing::field::Empty,
179 flow_id = tracing::field::Empty,
180 node_id = tracing::field::Empty,
181 tool = tracing::field::Empty,
182 action = tracing::field::Empty
183 );
184 annotate_span(
185 &span,
186 &FlowSpanAttributes {
187 tenant: ctx.tenant,
188 flow_id: ctx.flow_id,
189 node_id: ctx.node_id,
190 tool: ctx.tool,
191 action: ctx.action,
192 },
193 );
194 set_flow_context(
195 &self.default_env,
196 ctx.tenant,
197 ctx.flow_id,
198 ctx.node_id,
199 ctx.provider_id,
200 ctx.session_id,
201 );
202 let retry_config = ctx.retry_config;
203 let original_input = input;
204 async move {
205 let mut attempt = 0u32;
206 loop {
207 attempt += 1;
208 match self.execute_once(&ctx, original_input.clone()).await {
209 Ok(value) => return Ok(value),
210 Err(err) => {
211 if attempt >= retry_config.max_attempts || !should_retry(&err) {
212 return Err(err);
213 }
214 let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
215 tracing::warn!(
216 tenant = ctx.tenant,
217 flow_id = ctx.flow_id,
218 attempt,
219 max_attempts = retry_config.max_attempts,
220 delay_ms = delay,
221 error = %err,
222 "transient flow execution failure, backing off"
223 );
224 tokio::time::sleep(Duration::from_millis(delay)).await;
225 }
226 }
227 }
228 }
229 .instrument(span)
230 .await
231 }
232
233 pub async fn resume(
234 &self,
235 ctx: FlowContext<'_>,
236 snapshot: FlowSnapshot,
237 input: Value,
238 ) -> Result<FlowExecution> {
239 if snapshot.flow_id != ctx.flow_id {
240 bail!(
241 "snapshot flow {} does not match requested {}",
242 snapshot.flow_id,
243 ctx.flow_id
244 );
245 }
246 let flow_ir = self.get_or_load_flow_ir(ctx.flow_id).await?;
247 let mut state = snapshot.state;
248 state.replace_input(input);
249 self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
250 .await
251 }
252
253 async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
254 let flow_ir = self.get_or_load_flow_ir(ctx.flow_id).await?;
255 let state = ExecutionState::new(input);
256 self.drive_flow(ctx, flow_ir, state, None).await
257 }
258
259 async fn drive_flow(
260 &self,
261 ctx: &FlowContext<'_>,
262 flow_ir: HostFlowIR,
263 mut state: ExecutionState,
264 resume_from: Option<String>,
265 ) -> Result<FlowExecution> {
266 let mut current = flow_ir
267 .start
268 .clone()
269 .or_else(|| flow_ir.nodes.keys().next().cloned())
270 .with_context(|| format!("flow {} has no start node", flow_ir.id))?;
271 if let Some(resume) = resume_from {
272 current = resume;
273 }
274 let mut final_payload = None;
275
276 loop {
277 let node = flow_ir
278 .nodes
279 .get(¤t)
280 .with_context(|| format!("node {current} not found"))?;
281
282 let payload = node.payload_expr.clone();
283 let observed_payload = payload.clone();
284 let node_id = current.clone();
285 let event = NodeEvent {
286 context: ctx,
287 node_id: &node_id,
288 node,
289 payload: &observed_payload,
290 };
291 if let Some(observer) = ctx.observer {
292 observer.on_node_start(&event);
293 }
294 let DispatchOutcome {
295 output,
296 wait_reason,
297 } = self
298 .dispatch_node(ctx, ¤t, node, &mut state, payload)
299 .await?;
300
301 state.nodes.insert(current.clone(), output.clone());
302
303 let mut next = None;
304 let mut should_exit = false;
305 for route in &node.routes {
306 if route.out || matches!(route.to.as_deref(), Some("out")) {
307 final_payload = Some(output.payload.clone());
308 should_exit = true;
309 break;
310 }
311 if let Some(to) = &route.to {
312 next = Some(to.clone());
313 break;
314 }
315 }
316
317 if let Some(wait_reason) = wait_reason {
318 let resume_target = next.clone().ok_or_else(|| {
319 anyhow!("session.wait node {current} requires a non-empty route")
320 })?;
321 let mut snapshot_state = state.clone();
322 snapshot_state.clear_egress();
323 let snapshot = FlowSnapshot {
324 flow_id: ctx.flow_id.to_string(),
325 next_node: resume_target,
326 state: snapshot_state,
327 };
328 let output_value = state.clone().finalize_with(None);
329 return Ok(FlowExecution::waiting(
330 output_value,
331 FlowWait {
332 reason: Some(wait_reason),
333 snapshot,
334 },
335 ));
336 }
337
338 if should_exit {
339 break;
340 }
341
342 match next {
343 Some(n) => current = n,
344 None => {
345 final_payload = Some(output.payload.clone());
346 break;
347 }
348 }
349 }
350
351 let payload = final_payload.unwrap_or(Value::Null);
352 Ok(FlowExecution::completed(state.finalize_with(Some(payload))))
353 }
354
355 async fn dispatch_node(
356 &self,
357 ctx: &FlowContext<'_>,
358 node_id: &str,
359 node: &HostNode,
360 state: &mut ExecutionState,
361 payload: Value,
362 ) -> Result<DispatchOutcome> {
363 match &node.kind {
364 NodeKind::Exec { target_component } => self
365 .execute_component_exec(
366 ctx,
367 node_id,
368 state,
369 payload,
370 Some(target_component.as_str()),
371 )
372 .await
373 .map(DispatchOutcome::complete),
374 NodeKind::PackComponent { component_ref } => self
375 .execute_component_exec(ctx, node_id, state, payload, Some(component_ref.as_str()))
376 .await
377 .map(DispatchOutcome::complete),
378 NodeKind::FlowCall => self
379 .execute_flow_call(ctx, payload)
380 .await
381 .map(DispatchOutcome::complete),
382 NodeKind::BuiltinEmit { kind } => {
383 match kind {
384 EmitKind::Log | EmitKind::Response => {}
385 EmitKind::Other(component) => {
386 tracing::debug!(%component, "handling emit.* as builtin");
387 }
388 }
389 state.push_egress(payload.clone());
390 Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
391 }
392 NodeKind::Wait => {
393 let reason = extract_wait_reason(&payload);
394 Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
395 }
396 }
397 }
398
399 async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
400 #[derive(Deserialize)]
401 struct FlowCallPayload {
402 #[serde(alias = "flow")]
403 flow_id: String,
404 #[serde(default)]
405 input: Value,
406 }
407
408 let call: FlowCallPayload =
409 serde_json::from_value(payload).context("invalid payload for flow.call node")?;
410 if call.flow_id.trim().is_empty() {
411 bail!("flow.call requires a non-empty flow_id");
412 }
413
414 let sub_input = if call.input.is_null() {
415 Value::Null
416 } else {
417 call.input
418 };
419
420 let flow_id_owned = call.flow_id;
421 let action = "flow.call";
422 let sub_ctx = FlowContext {
423 tenant: ctx.tenant,
424 flow_id: flow_id_owned.as_str(),
425 node_id: None,
426 tool: ctx.tool,
427 action: Some(action),
428 session_id: ctx.session_id,
429 provider_id: ctx.provider_id,
430 retry_config: ctx.retry_config,
431 observer: ctx.observer,
432 mocks: ctx.mocks,
433 };
434
435 let execution = Box::pin(self.execute(sub_ctx, sub_input))
436 .await
437 .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
438 match execution.status {
439 FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
440 FlowStatus::Waiting(wait) => bail!(
441 "flow.call cannot pause (flow {} waiting {:?})",
442 flow_id_owned,
443 wait.reason
444 ),
445 }
446 }
447
448 async fn execute_component_exec(
449 &self,
450 ctx: &FlowContext<'_>,
451 node_id: &str,
452 state: &ExecutionState,
453 payload: Value,
454 component_override: Option<&str>,
455 ) -> Result<NodeOutput> {
456 #[derive(Deserialize)]
457 struct ComponentPayload {
458 #[serde(default, alias = "component_ref", alias = "component")]
459 component: Option<String>,
460 #[serde(alias = "op")]
461 operation: Option<String>,
462 #[serde(default)]
463 input: Value,
464 #[serde(default)]
465 config: Value,
466 }
467
468 let payload: ComponentPayload =
469 serde_json::from_value(payload).context("invalid payload for component.exec")?;
470 let component_ref = component_override
471 .map(str::to_string)
472 .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
473 .with_context(|| "component.exec requires a component_ref")?;
474 let operation = payload
475 .operation
476 .filter(|v| !v.trim().is_empty())
477 .with_context(|| "component.exec requires an operation")?;
478 let mut input = payload.input;
479 if let Value::Object(mut map) = input {
480 map.entry("state".to_string())
481 .or_insert_with(|| state.context());
482 input = Value::Object(map);
483 }
484 let input_json = serde_json::to_string(&input)?;
485 let config_json = if payload.config.is_null() {
486 None
487 } else {
488 Some(serde_json::to_string(&payload.config)?)
489 };
490
491 let pack_idx = *self
492 .flow_sources
493 .get(ctx.flow_id)
494 .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
495 let pack = Arc::clone(&self.packs[pack_idx]);
496 let exec_ctx = component_exec_ctx(ctx, node_id);
497 let value = pack
498 .invoke_component(
499 &component_ref,
500 exec_ctx,
501 &operation,
502 config_json,
503 input_json,
504 )
505 .await?;
506
507 Ok(NodeOutput::new(value))
508 }
509
510 pub fn flows(&self) -> &[FlowDescriptor] {
511 &self.flows
512 }
513
514 pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
515 self.flows
516 .iter()
517 .find(|descriptor| descriptor.flow_type == flow_type)
518 }
519
520 pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
521 self.flows
522 .iter()
523 .find(|descriptor| descriptor.id == flow_id)
524 }
525}
526
527pub trait ExecutionObserver: Send + Sync {
528 fn on_node_start(&self, event: &NodeEvent<'_>);
529 fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
530 fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
531}
532
533pub struct NodeEvent<'a> {
534 pub context: &'a FlowContext<'a>,
535 pub node_id: &'a str,
536 pub node: &'a HostNode,
537 pub payload: &'a Value,
538}
539
540#[derive(Clone, Debug, Serialize, Deserialize)]
541pub struct ExecutionState {
542 input: Value,
543 nodes: HashMap<String, NodeOutput>,
544 egress: Vec<Value>,
545}
546
547impl ExecutionState {
548 fn new(input: Value) -> Self {
549 Self {
550 input,
551 nodes: HashMap::new(),
552 egress: Vec::new(),
553 }
554 }
555
556 fn context(&self) -> Value {
557 let mut nodes = JsonMap::new();
558 for (id, output) in &self.nodes {
559 nodes.insert(
560 id.clone(),
561 json!({
562 "ok": output.ok,
563 "payload": output.payload.clone(),
564 "meta": output.meta.clone(),
565 }),
566 );
567 }
568 json!({
569 "input": self.input.clone(),
570 "nodes": nodes,
571 })
572 }
573 fn push_egress(&mut self, payload: Value) {
574 self.egress.push(payload);
575 }
576
577 fn replace_input(&mut self, input: Value) {
578 self.input = input;
579 }
580
581 fn clear_egress(&mut self) {
582 self.egress.clear();
583 }
584
585 fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
586 if self.egress.is_empty() {
587 return final_payload.unwrap_or(Value::Null);
588 }
589 let mut emitted = std::mem::take(&mut self.egress);
590 if let Some(value) = final_payload {
591 match value {
592 Value::Null => {}
593 Value::Array(items) => emitted.extend(items),
594 other => emitted.push(other),
595 }
596 }
597 Value::Array(emitted)
598 }
599}
600
601#[derive(Clone, Debug, Serialize, Deserialize)]
602struct NodeOutput {
603 ok: bool,
604 payload: Value,
605 meta: Value,
606}
607
608impl NodeOutput {
609 fn new(payload: Value) -> Self {
610 Self {
611 ok: true,
612 payload,
613 meta: Value::Null,
614 }
615 }
616}
617
618struct DispatchOutcome {
619 output: NodeOutput,
620 wait_reason: Option<String>,
621}
622
623impl DispatchOutcome {
624 fn complete(output: NodeOutput) -> Self {
625 Self {
626 output,
627 wait_reason: None,
628 }
629 }
630
631 fn wait(output: NodeOutput, reason: Option<String>) -> Self {
632 Self {
633 output,
634 wait_reason: reason,
635 }
636 }
637}
638
639fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
640 ComponentExecCtx {
641 tenant: ComponentTenantCtx {
642 tenant: ctx.tenant.to_string(),
643 team: None,
644 user: ctx.provider_id.map(str::to_string),
645 trace_id: None,
646 correlation_id: ctx.session_id.map(str::to_string),
647 deadline_unix_ms: None,
648 attempt: 1,
649 idempotency_key: ctx.session_id.map(str::to_string),
650 },
651 flow_id: ctx.flow_id.to_string(),
652 node_id: Some(node_id.to_string()),
653 }
654}
655
656fn extract_wait_reason(payload: &Value) -> Option<String> {
657 match payload {
658 Value::String(s) => Some(s.clone()),
659 Value::Object(map) => map
660 .get("reason")
661 .and_then(Value::as_str)
662 .map(|value| value.to_string()),
663 _ => None,
664 }
665}
666
667impl From<FlowIR> for HostFlowIR {
668 fn from(value: FlowIR) -> Self {
669 let nodes = value
670 .nodes
671 .into_iter()
672 .map(|(id, node)| (id, HostNode::from(node)))
673 .collect();
674 Self {
675 id: value.id,
676 flow_type: value.flow_type,
677 start: value.start,
678 parameters: value.parameters,
679 nodes,
680 }
681 }
682}
683
684impl From<NodeIR> for HostNode {
685 fn from(node: NodeIR) -> Self {
686 let kind = match node.component.as_str() {
687 "component.exec" => {
688 let target = extract_target_component(&node.payload_expr)
689 .unwrap_or_else(|| "component.exec".to_string());
690 if target.starts_with("emit.") {
691 NodeKind::BuiltinEmit {
692 kind: emit_kind_from_ref(&target),
693 }
694 } else {
695 NodeKind::Exec {
696 target_component: target,
697 }
698 }
699 }
700 "flow.call" => NodeKind::FlowCall,
701 "session.wait" => NodeKind::Wait,
702 comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
703 kind: emit_kind_from_ref(comp),
704 },
705 other => NodeKind::PackComponent {
706 component_ref: other.to_string(),
707 },
708 };
709 let component_label = match &kind {
710 NodeKind::Exec { .. } => "component.exec".to_string(),
711 NodeKind::PackComponent { component_ref } => component_ref.clone(),
712 NodeKind::FlowCall => "flow.call".to_string(),
713 NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
714 NodeKind::Wait => "session.wait".to_string(),
715 };
716 let payload_expr = match kind {
717 NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.payload_expr),
718 _ => node.payload_expr.clone(),
719 };
720 Self {
721 kind,
722 component: component_label,
723 payload_expr,
724 routes: node.routes,
725 }
726 }
727}
728
729fn extract_target_component(payload: &Value) -> Option<String> {
730 match payload {
731 Value::Object(map) => map
732 .get("component")
733 .or_else(|| map.get("component_ref"))
734 .and_then(Value::as_str)
735 .map(|s| s.to_string()),
736 _ => None,
737 }
738}
739
740fn extract_emit_payload(payload: &Value) -> Value {
741 if let Value::Object(map) = payload {
742 if let Some(input) = map.get("input") {
743 return input.clone();
744 }
745 if let Some(inner) = map.get("payload") {
746 return inner.clone();
747 }
748 }
749 payload.clone()
750}
751
752fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
753 match component_ref {
754 "emit.log" => EmitKind::Log,
755 "emit.response" => EmitKind::Response,
756 other => EmitKind::Other(other.to_string()),
757 }
758}
759
760fn emit_ref_from_kind(kind: &EmitKind) -> String {
761 match kind {
762 EmitKind::Log => "emit.log".to_string(),
763 EmitKind::Response => "emit.response".to_string(),
764 EmitKind::Other(other) => other.clone(),
765 }
766}
767
768#[cfg(test)]
769mod tests {
770 use super::*;
771 use serde_json::json;
772
773 #[test]
774 fn templating_renders_with_partials_and_data() {
775 let mut state = ExecutionState::new(json!({ "city": "London" }));
776 state.nodes.insert(
777 "forecast".to_string(),
778 NodeOutput::new(json!({ "temp": "20C" })),
779 );
780
781 let ctx = state.context();
783 assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
784 }
785
786 #[test]
787 fn finalize_wraps_emitted_payloads() {
788 let mut state = ExecutionState::new(json!({}));
789 state.push_egress(json!({ "text": "first" }));
790 state.push_egress(json!({ "text": "second" }));
791 let result = state.finalize_with(Some(json!({ "text": "final" })));
792 assert_eq!(
793 result,
794 json!([
795 { "text": "first" },
796 { "text": "second" },
797 { "text": "final" }
798 ])
799 );
800 }
801
802 #[test]
803 fn finalize_flattens_final_array() {
804 let mut state = ExecutionState::new(json!({}));
805 state.push_egress(json!({ "text": "only" }));
806 let result = state.finalize_with(Some(json!([
807 { "text": "extra-1" },
808 { "text": "extra-2" }
809 ])));
810 assert_eq!(
811 result,
812 json!([
813 { "text": "only" },
814 { "text": "extra-1" },
815 { "text": "extra-2" }
816 ])
817 );
818 }
819}
820
821use tracing::Instrument;
822
823pub struct FlowContext<'a> {
824 pub tenant: &'a str,
825 pub flow_id: &'a str,
826 pub node_id: Option<&'a str>,
827 pub tool: Option<&'a str>,
828 pub action: Option<&'a str>,
829 pub session_id: Option<&'a str>,
830 pub provider_id: Option<&'a str>,
831 pub retry_config: RetryConfig,
832 pub observer: Option<&'a dyn ExecutionObserver>,
833 pub mocks: Option<&'a MockLayer>,
834}
835
836#[derive(Copy, Clone)]
837pub struct RetryConfig {
838 pub max_attempts: u32,
839 pub base_delay_ms: u64,
840}
841
842fn should_retry(err: &anyhow::Error) -> bool {
843 let lower = err.to_string().to_lowercase();
844 lower.contains("transient") || lower.contains("unavailable") || lower.contains("internal")
845}
846
847impl From<FlowRetryConfig> for RetryConfig {
848 fn from(value: FlowRetryConfig) -> Self {
849 Self {
850 max_attempts: value.max_attempts.max(1),
851 base_delay_ms: value.base_delay_ms.max(50),
852 }
853 }
854}