1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
9use anyhow::{Context, Result, anyhow, bail};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use crate::config::{FlowRetryConfig, HostConfig};
18use crate::pack::{FlowDescriptor, PackRuntime};
19use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
20use greentic_types::{Flow, Node, NodeId, Routing};
21
22pub struct FlowEngine {
23 packs: Vec<Arc<PackRuntime>>,
24 flows: Vec<FlowDescriptor>,
25 flow_sources: HashMap<String, usize>,
26 flow_cache: RwLock<HashMap<String, HostFlow>>,
27 default_env: String,
28}
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
31pub struct FlowSnapshot {
32 pub flow_id: String,
33 pub next_node: String,
34 pub state: ExecutionState,
35}
36
37#[derive(Clone, Debug)]
38pub struct FlowWait {
39 pub reason: Option<String>,
40 pub snapshot: FlowSnapshot,
41}
42
43#[derive(Clone, Debug)]
44pub enum FlowStatus {
45 Completed,
46 Waiting(FlowWait),
47}
48
49#[derive(Clone, Debug)]
50pub struct FlowExecution {
51 pub output: Value,
52 pub status: FlowStatus,
53}
54
55#[derive(Clone, Debug)]
56struct HostFlow {
57 id: String,
58 start: Option<NodeId>,
59 nodes: IndexMap<NodeId, HostNode>,
60}
61
62#[derive(Clone, Debug)]
63pub struct HostNode {
64 kind: NodeKind,
65 pub component: String,
67 payload_expr: Value,
68 routing: Routing,
69}
70
71#[derive(Clone, Debug)]
72enum NodeKind {
73 Exec { target_component: String },
74 PackComponent { component_ref: String },
75 ProviderInvoke,
76 FlowCall,
77 BuiltinEmit { kind: EmitKind },
78 Wait,
79}
80
81#[derive(Clone, Debug)]
82enum EmitKind {
83 Log,
84 Response,
85 Other(String),
86}
87
88impl FlowExecution {
89 fn completed(output: Value) -> Self {
90 Self {
91 output,
92 status: FlowStatus::Completed,
93 }
94 }
95
96 fn waiting(output: Value, wait: FlowWait) -> Self {
97 Self {
98 output,
99 status: FlowStatus::Waiting(wait),
100 }
101 }
102}
103
104impl FlowEngine {
105 pub async fn new(packs: Vec<Arc<PackRuntime>>, _config: Arc<HostConfig>) -> Result<Self> {
106 let mut flow_sources = HashMap::new();
107 let mut descriptors = Vec::new();
108 for (idx, pack) in packs.iter().enumerate() {
109 let flows = pack.list_flows().await?;
110 for flow in flows {
111 tracing::info!(
112 flow_id = %flow.id,
113 flow_type = %flow.flow_type,
114 pack_index = idx,
115 "registered flow"
116 );
117 flow_sources.insert(flow.id.clone(), idx);
118 descriptors.retain(|existing: &FlowDescriptor| existing.id != flow.id);
119 descriptors.push(flow);
120 }
121 }
122
123 let mut flow_map = HashMap::new();
124 for flow in &descriptors {
125 if let Some(&pack_idx) = flow_sources.get(&flow.id) {
126 let pack_clone = Arc::clone(&packs[pack_idx]);
127 let flow_id = flow.id.clone();
128 let task_flow_id = flow_id.clone();
129 match task::spawn_blocking(move || pack_clone.load_flow(&task_flow_id)).await {
130 Ok(Ok(flow)) => {
131 flow_map.insert(flow_id, HostFlow::from(flow));
132 }
133 Ok(Err(err)) => {
134 tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
135 }
136 Err(err) => {
137 tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
138 }
139 }
140 }
141 }
142
143 Ok(Self {
144 packs,
145 flows: descriptors,
146 flow_sources,
147 flow_cache: RwLock::new(flow_map),
148 default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
149 })
150 }
151
152 async fn get_or_load_flow(&self, flow_id: &str) -> Result<HostFlow> {
153 if let Some(flow) = self.flow_cache.read().get(flow_id).cloned() {
154 return Ok(flow);
155 }
156
157 let pack_idx = *self
158 .flow_sources
159 .get(flow_id)
160 .with_context(|| format!("flow {flow_id} not registered"))?;
161 let pack = Arc::clone(&self.packs[pack_idx]);
162 let flow_id_owned = flow_id.to_string();
163 let task_flow_id = flow_id_owned.clone();
164 let flow = task::spawn_blocking(move || pack.load_flow(&task_flow_id))
165 .await
166 .context("failed to join flow metadata task")??;
167 let host_flow = HostFlow::from(flow);
168 self.flow_cache
169 .write()
170 .insert(flow_id_owned.clone(), host_flow.clone());
171 Ok(host_flow)
172 }
173
174 pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
175 let span = tracing::info_span!(
176 "flow.execute",
177 tenant = tracing::field::Empty,
178 flow_id = tracing::field::Empty,
179 node_id = tracing::field::Empty,
180 tool = tracing::field::Empty,
181 action = tracing::field::Empty
182 );
183 annotate_span(
184 &span,
185 &FlowSpanAttributes {
186 tenant: ctx.tenant,
187 flow_id: ctx.flow_id,
188 node_id: ctx.node_id,
189 tool: ctx.tool,
190 action: ctx.action,
191 },
192 );
193 set_flow_context(
194 &self.default_env,
195 ctx.tenant,
196 ctx.flow_id,
197 ctx.node_id,
198 ctx.provider_id,
199 ctx.session_id,
200 );
201 let retry_config = ctx.retry_config;
202 let original_input = input;
203 async move {
204 let mut attempt = 0u32;
205 loop {
206 attempt += 1;
207 match self.execute_once(&ctx, original_input.clone()).await {
208 Ok(value) => return Ok(value),
209 Err(err) => {
210 if attempt >= retry_config.max_attempts || !should_retry(&err) {
211 return Err(err);
212 }
213 let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
214 tracing::warn!(
215 tenant = ctx.tenant,
216 flow_id = ctx.flow_id,
217 attempt,
218 max_attempts = retry_config.max_attempts,
219 delay_ms = delay,
220 error = %err,
221 "transient flow execution failure, backing off"
222 );
223 tokio::time::sleep(Duration::from_millis(delay)).await;
224 }
225 }
226 }
227 }
228 .instrument(span)
229 .await
230 }
231
232 pub async fn resume(
233 &self,
234 ctx: FlowContext<'_>,
235 snapshot: FlowSnapshot,
236 input: Value,
237 ) -> Result<FlowExecution> {
238 if snapshot.flow_id != ctx.flow_id {
239 bail!(
240 "snapshot flow {} does not match requested {}",
241 snapshot.flow_id,
242 ctx.flow_id
243 );
244 }
245 let flow_ir = self.get_or_load_flow(ctx.flow_id).await?;
246 let mut state = snapshot.state;
247 state.replace_input(input);
248 self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
249 .await
250 }
251
252 async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
253 let flow_ir = self.get_or_load_flow(ctx.flow_id).await?;
254 let state = ExecutionState::new(input);
255 self.drive_flow(ctx, flow_ir, state, None).await
256 }
257
258 async fn drive_flow(
259 &self,
260 ctx: &FlowContext<'_>,
261 flow_ir: HostFlow,
262 mut state: ExecutionState,
263 resume_from: Option<String>,
264 ) -> Result<FlowExecution> {
265 let mut current = match resume_from {
266 Some(node) => NodeId::from_str(&node)
267 .with_context(|| format!("invalid resume node id `{node}`"))?,
268 None => flow_ir
269 .start
270 .clone()
271 .or_else(|| flow_ir.nodes.keys().next().cloned())
272 .with_context(|| format!("flow {} has no start node", flow_ir.id))?,
273 };
274
275 loop {
276 let node = flow_ir
277 .nodes
278 .get(¤t)
279 .with_context(|| format!("node {} not found", current.as_str()))?;
280
281 let payload = node.payload_expr.clone();
282 let observed_payload = payload.clone();
283 let node_id = current.clone();
284 let event = NodeEvent {
285 context: ctx,
286 node_id: node_id.as_str(),
287 node,
288 payload: &observed_payload,
289 };
290 if let Some(observer) = ctx.observer {
291 observer.on_node_start(&event);
292 }
293 let DispatchOutcome {
294 output,
295 wait_reason,
296 } = self
297 .dispatch_node(ctx, node_id.as_str(), node, &mut state, payload)
298 .await?;
299
300 state.nodes.insert(node_id.clone().into(), output.clone());
301
302 let (next, should_exit) = match &node.routing {
303 Routing::Next { node_id } => (Some(node_id.clone()), false),
304 Routing::End | Routing::Reply => (None, true),
305 Routing::Branch { default, .. } => (default.clone(), default.is_none()),
306 Routing::Custom(raw) => {
307 tracing::warn!(
308 flow_id = %flow_ir.id,
309 node_id = %node_id,
310 routing = ?raw,
311 "unsupported routing; terminating flow"
312 );
313 (None, true)
314 }
315 };
316
317 if let Some(wait_reason) = wait_reason {
318 let resume_target = next.clone().ok_or_else(|| {
319 anyhow!(
320 "session.wait node {} requires a non-empty route",
321 current.as_str()
322 )
323 })?;
324 let mut snapshot_state = state.clone();
325 snapshot_state.clear_egress();
326 let snapshot = FlowSnapshot {
327 flow_id: ctx.flow_id.to_string(),
328 next_node: resume_target.as_str().to_string(),
329 state: snapshot_state,
330 };
331 let output_value = state.clone().finalize_with(None);
332 return Ok(FlowExecution::waiting(
333 output_value,
334 FlowWait {
335 reason: Some(wait_reason),
336 snapshot,
337 },
338 ));
339 }
340
341 if should_exit {
342 return Ok(FlowExecution::completed(
343 state.finalize_with(Some(output.payload.clone())),
344 ));
345 }
346
347 match next {
348 Some(n) => current = n,
349 None => {
350 return Ok(FlowExecution::completed(
351 state.finalize_with(Some(output.payload.clone())),
352 ));
353 }
354 }
355 }
356 }
357
358 async fn dispatch_node(
359 &self,
360 ctx: &FlowContext<'_>,
361 node_id: &str,
362 node: &HostNode,
363 state: &mut ExecutionState,
364 payload: Value,
365 ) -> Result<DispatchOutcome> {
366 match &node.kind {
367 NodeKind::Exec { target_component } => self
368 .execute_component_exec(
369 ctx,
370 node_id,
371 state,
372 payload,
373 Some(target_component.as_str()),
374 )
375 .await
376 .map(DispatchOutcome::complete),
377 NodeKind::PackComponent { component_ref } => self
378 .execute_component_exec(ctx, node_id, state, payload, Some(component_ref.as_str()))
379 .await
380 .map(DispatchOutcome::complete),
381 NodeKind::FlowCall => self
382 .execute_flow_call(ctx, payload)
383 .await
384 .map(DispatchOutcome::complete),
385 NodeKind::ProviderInvoke => self
386 .execute_provider_invoke(ctx, node_id, state, payload)
387 .await
388 .map(DispatchOutcome::complete),
389 NodeKind::BuiltinEmit { kind } => {
390 match kind {
391 EmitKind::Log | EmitKind::Response => {}
392 EmitKind::Other(component) => {
393 tracing::debug!(%component, "handling emit.* as builtin");
394 }
395 }
396 state.push_egress(payload.clone());
397 Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
398 }
399 NodeKind::Wait => {
400 let reason = extract_wait_reason(&payload);
401 Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
402 }
403 }
404 }
405
406 async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
407 #[derive(Deserialize)]
408 struct FlowCallPayload {
409 #[serde(alias = "flow")]
410 flow_id: String,
411 #[serde(default)]
412 input: Value,
413 }
414
415 let call: FlowCallPayload =
416 serde_json::from_value(payload).context("invalid payload for flow.call node")?;
417 if call.flow_id.trim().is_empty() {
418 bail!("flow.call requires a non-empty flow_id");
419 }
420
421 let sub_input = if call.input.is_null() {
422 Value::Null
423 } else {
424 call.input
425 };
426
427 let flow_id_owned = call.flow_id;
428 let action = "flow.call";
429 let sub_ctx = FlowContext {
430 tenant: ctx.tenant,
431 flow_id: flow_id_owned.as_str(),
432 node_id: None,
433 tool: ctx.tool,
434 action: Some(action),
435 session_id: ctx.session_id,
436 provider_id: ctx.provider_id,
437 retry_config: ctx.retry_config,
438 observer: ctx.observer,
439 mocks: ctx.mocks,
440 };
441
442 let execution = Box::pin(self.execute(sub_ctx, sub_input))
443 .await
444 .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
445 match execution.status {
446 FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
447 FlowStatus::Waiting(wait) => bail!(
448 "flow.call cannot pause (flow {} waiting {:?})",
449 flow_id_owned,
450 wait.reason
451 ),
452 }
453 }
454
455 async fn execute_component_exec(
456 &self,
457 ctx: &FlowContext<'_>,
458 node_id: &str,
459 state: &ExecutionState,
460 payload: Value,
461 component_override: Option<&str>,
462 ) -> Result<NodeOutput> {
463 #[derive(Deserialize)]
464 struct ComponentPayload {
465 #[serde(default, alias = "component_ref", alias = "component")]
466 component: Option<String>,
467 #[serde(alias = "op")]
468 operation: Option<String>,
469 #[serde(default)]
470 input: Value,
471 #[serde(default)]
472 config: Value,
473 }
474
475 let payload: ComponentPayload =
476 serde_json::from_value(payload).context("invalid payload for component.exec")?;
477 let component_ref = component_override
478 .map(str::to_string)
479 .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
480 .with_context(|| "component.exec requires a component_ref")?;
481 let operation = payload
482 .operation
483 .filter(|v| !v.trim().is_empty())
484 .with_context(|| "component.exec requires an operation")?;
485 let mut input = payload.input;
486 if let Value::Object(mut map) = input {
487 map.entry("state".to_string())
488 .or_insert_with(|| state.context());
489 input = Value::Object(map);
490 }
491 let input_json = serde_json::to_string(&input)?;
492 let config_json = if payload.config.is_null() {
493 None
494 } else {
495 Some(serde_json::to_string(&payload.config)?)
496 };
497
498 let pack_idx = *self
499 .flow_sources
500 .get(ctx.flow_id)
501 .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
502 let pack = Arc::clone(&self.packs[pack_idx]);
503 let exec_ctx = component_exec_ctx(ctx, node_id);
504 let value = pack
505 .invoke_component(
506 &component_ref,
507 exec_ctx,
508 &operation,
509 config_json,
510 input_json,
511 )
512 .await?;
513
514 Ok(NodeOutput::new(value))
515 }
516
517 async fn execute_provider_invoke(
518 &self,
519 ctx: &FlowContext<'_>,
520 node_id: &str,
521 state: &ExecutionState,
522 payload: Value,
523 ) -> Result<NodeOutput> {
524 #[derive(Deserialize)]
525 struct ProviderPayload {
526 #[serde(default)]
527 provider_id: Option<String>,
528 #[serde(default)]
529 provider_type: Option<String>,
530 #[serde(default, alias = "operation")]
531 op: Option<String>,
532 #[serde(default)]
533 input: Value,
534 #[serde(default)]
535 in_map: Value,
536 #[serde(default)]
537 out_map: Value,
538 #[serde(default)]
539 err_map: Value,
540 }
541
542 let payload: ProviderPayload =
543 serde_json::from_value(payload).context("invalid payload for provider.invoke")?;
544 let op = payload
545 .op
546 .as_deref()
547 .filter(|v| !v.trim().is_empty())
548 .with_context(|| "provider.invoke requires an op")?
549 .to_string();
550
551 let mut input_value = if !payload.in_map.is_null() {
552 let ctx_value = mapping_ctx(payload.input.clone(), state);
553 apply_mapping(&payload.in_map, &ctx_value)
554 .context("failed to evaluate provider.invoke in_map")?
555 } else if !payload.input.is_null() {
556 payload.input
557 } else {
558 Value::Null
559 };
560
561 if let Value::Object(ref mut map) = input_value {
562 map.entry("state".to_string())
563 .or_insert_with(|| state.context());
564 }
565 let input_json = serde_json::to_vec(&input_value)?;
566
567 let pack_idx = *self
568 .flow_sources
569 .get(ctx.flow_id)
570 .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
571 let pack = Arc::clone(&self.packs[pack_idx]);
572 let binding = pack.resolve_provider(
573 payload.provider_id.as_deref(),
574 payload.provider_type.as_deref(),
575 )?;
576 let exec_ctx = component_exec_ctx(ctx, node_id);
577 let result = pack
578 .invoke_provider(&binding, exec_ctx, &op, input_json)
579 .await?;
580
581 let output = if payload.out_map.is_null() {
582 result
583 } else {
584 let ctx_value = mapping_ctx(result, state);
585 apply_mapping(&payload.out_map, &ctx_value)
586 .context("failed to evaluate provider.invoke out_map")?
587 };
588 let _ = payload.err_map;
589 Ok(NodeOutput::new(output))
590 }
591
592 pub fn flows(&self) -> &[FlowDescriptor] {
593 &self.flows
594 }
595
596 pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
597 self.flows
598 .iter()
599 .find(|descriptor| descriptor.flow_type == flow_type)
600 }
601
602 pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
603 self.flows
604 .iter()
605 .find(|descriptor| descriptor.id == flow_id)
606 }
607}
608
609pub trait ExecutionObserver: Send + Sync {
610 fn on_node_start(&self, event: &NodeEvent<'_>);
611 fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
612 fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
613}
614
615pub struct NodeEvent<'a> {
616 pub context: &'a FlowContext<'a>,
617 pub node_id: &'a str,
618 pub node: &'a HostNode,
619 pub payload: &'a Value,
620}
621
622#[derive(Clone, Debug, Serialize, Deserialize)]
623pub struct ExecutionState {
624 input: Value,
625 nodes: HashMap<String, NodeOutput>,
626 egress: Vec<Value>,
627}
628
629impl ExecutionState {
630 fn new(input: Value) -> Self {
631 Self {
632 input,
633 nodes: HashMap::new(),
634 egress: Vec::new(),
635 }
636 }
637
638 fn context(&self) -> Value {
639 let mut nodes = JsonMap::new();
640 for (id, output) in &self.nodes {
641 nodes.insert(
642 id.clone(),
643 json!({
644 "ok": output.ok,
645 "payload": output.payload.clone(),
646 "meta": output.meta.clone(),
647 }),
648 );
649 }
650 json!({
651 "input": self.input.clone(),
652 "nodes": nodes,
653 })
654 }
655 fn push_egress(&mut self, payload: Value) {
656 self.egress.push(payload);
657 }
658
659 fn replace_input(&mut self, input: Value) {
660 self.input = input;
661 }
662
663 fn clear_egress(&mut self) {
664 self.egress.clear();
665 }
666
667 fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
668 if self.egress.is_empty() {
669 return final_payload.unwrap_or(Value::Null);
670 }
671 let mut emitted = std::mem::take(&mut self.egress);
672 if let Some(value) = final_payload {
673 match value {
674 Value::Null => {}
675 Value::Array(items) => emitted.extend(items),
676 other => emitted.push(other),
677 }
678 }
679 Value::Array(emitted)
680 }
681}
682
683#[derive(Clone, Debug, Serialize, Deserialize)]
684struct NodeOutput {
685 ok: bool,
686 payload: Value,
687 meta: Value,
688}
689
690impl NodeOutput {
691 fn new(payload: Value) -> Self {
692 Self {
693 ok: true,
694 payload,
695 meta: Value::Null,
696 }
697 }
698}
699
700struct DispatchOutcome {
701 output: NodeOutput,
702 wait_reason: Option<String>,
703}
704
705impl DispatchOutcome {
706 fn complete(output: NodeOutput) -> Self {
707 Self {
708 output,
709 wait_reason: None,
710 }
711 }
712
713 fn wait(output: NodeOutput, reason: Option<String>) -> Self {
714 Self {
715 output,
716 wait_reason: reason,
717 }
718 }
719}
720
721fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
722 ComponentExecCtx {
723 tenant: ComponentTenantCtx {
724 tenant: ctx.tenant.to_string(),
725 team: None,
726 user: ctx.provider_id.map(str::to_string),
727 trace_id: None,
728 correlation_id: ctx.session_id.map(str::to_string),
729 deadline_unix_ms: None,
730 attempt: 1,
731 idempotency_key: ctx.session_id.map(str::to_string),
732 },
733 flow_id: ctx.flow_id.to_string(),
734 node_id: Some(node_id.to_string()),
735 }
736}
737
738fn extract_wait_reason(payload: &Value) -> Option<String> {
739 match payload {
740 Value::String(s) => Some(s.clone()),
741 Value::Object(map) => map
742 .get("reason")
743 .and_then(Value::as_str)
744 .map(|value| value.to_string()),
745 _ => None,
746 }
747}
748
749fn mapping_ctx(root: Value, state: &ExecutionState) -> Value {
750 json!({
751 "input": root.clone(),
752 "result": root,
753 "state": state.context(),
754 })
755}
756
757fn apply_mapping(template: &Value, ctx: &Value) -> Result<Value> {
758 match template {
759 Value::String(path) if path.starts_with('/') => ctx
760 .pointer(path)
761 .cloned()
762 .ok_or_else(|| anyhow!("mapping path `{path}` not found")),
763 Value::Array(items) => {
764 let mut mapped = Vec::with_capacity(items.len());
765 for item in items {
766 mapped.push(apply_mapping(item, ctx)?);
767 }
768 Ok(Value::Array(mapped))
769 }
770 Value::Object(map) => {
771 let mut out = serde_json::Map::new();
772 for (key, value) in map {
773 out.insert(key.clone(), apply_mapping(value, ctx)?);
774 }
775 Ok(Value::Object(out))
776 }
777 other => Ok(other.clone()),
778 }
779}
780
781impl From<Flow> for HostFlow {
782 fn from(value: Flow) -> Self {
783 let mut nodes = IndexMap::new();
784 for (id, node) in value.nodes {
785 nodes.insert(id.clone(), HostNode::from(node));
786 }
787 let start = value
788 .entrypoints
789 .get("default")
790 .and_then(Value::as_str)
791 .and_then(|id| NodeId::from_str(id).ok())
792 .or_else(|| nodes.keys().next().cloned());
793 Self {
794 id: value.id.as_str().to_string(),
795 start,
796 nodes,
797 }
798 }
799}
800
801impl From<Node> for HostNode {
802 fn from(node: Node) -> Self {
803 let component_ref = node.component.id.as_str().to_string();
804 let kind = match component_ref.as_str() {
805 "component.exec" => {
806 let target = extract_target_component(&node.input.mapping)
807 .unwrap_or_else(|| "component.exec".to_string());
808 if target.starts_with("emit.") {
809 NodeKind::BuiltinEmit {
810 kind: emit_kind_from_ref(&target),
811 }
812 } else {
813 NodeKind::Exec {
814 target_component: target,
815 }
816 }
817 }
818 "flow.call" => NodeKind::FlowCall,
819 "provider.invoke" => NodeKind::ProviderInvoke,
820 "session.wait" => NodeKind::Wait,
821 comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
822 kind: emit_kind_from_ref(comp),
823 },
824 other => NodeKind::PackComponent {
825 component_ref: other.to_string(),
826 },
827 };
828 let component_label = match &kind {
829 NodeKind::Exec { .. } => "component.exec".to_string(),
830 NodeKind::PackComponent { component_ref } => component_ref.clone(),
831 NodeKind::ProviderInvoke => "provider.invoke".to_string(),
832 NodeKind::FlowCall => "flow.call".to_string(),
833 NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
834 NodeKind::Wait => "session.wait".to_string(),
835 };
836 let payload_expr = match kind {
837 NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.input.mapping),
838 _ => node.input.mapping.clone(),
839 };
840 Self {
841 kind,
842 component: component_label,
843 payload_expr,
844 routing: node.routing,
845 }
846 }
847}
848
849fn extract_target_component(payload: &Value) -> Option<String> {
850 match payload {
851 Value::Object(map) => map
852 .get("component")
853 .or_else(|| map.get("component_ref"))
854 .and_then(Value::as_str)
855 .map(|s| s.to_string()),
856 _ => None,
857 }
858}
859
860fn extract_emit_payload(payload: &Value) -> Value {
861 if let Value::Object(map) = payload {
862 if let Some(input) = map.get("input") {
863 return input.clone();
864 }
865 if let Some(inner) = map.get("payload") {
866 return inner.clone();
867 }
868 }
869 payload.clone()
870}
871
872fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
873 match component_ref {
874 "emit.log" => EmitKind::Log,
875 "emit.response" => EmitKind::Response,
876 other => EmitKind::Other(other.to_string()),
877 }
878}
879
880fn emit_ref_from_kind(kind: &EmitKind) -> String {
881 match kind {
882 EmitKind::Log => "emit.log".to_string(),
883 EmitKind::Response => "emit.response".to_string(),
884 EmitKind::Other(other) => other.clone(),
885 }
886}
887
888#[cfg(test)]
889mod tests {
890 use super::*;
891 use serde_json::json;
892
893 #[test]
894 fn templating_renders_with_partials_and_data() {
895 let mut state = ExecutionState::new(json!({ "city": "London" }));
896 state.nodes.insert(
897 "forecast".to_string(),
898 NodeOutput::new(json!({ "temp": "20C" })),
899 );
900
901 let ctx = state.context();
903 assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
904 }
905
906 #[test]
907 fn finalize_wraps_emitted_payloads() {
908 let mut state = ExecutionState::new(json!({}));
909 state.push_egress(json!({ "text": "first" }));
910 state.push_egress(json!({ "text": "second" }));
911 let result = state.finalize_with(Some(json!({ "text": "final" })));
912 assert_eq!(
913 result,
914 json!([
915 { "text": "first" },
916 { "text": "second" },
917 { "text": "final" }
918 ])
919 );
920 }
921
922 #[test]
923 fn finalize_flattens_final_array() {
924 let mut state = ExecutionState::new(json!({}));
925 state.push_egress(json!({ "text": "only" }));
926 let result = state.finalize_with(Some(json!([
927 { "text": "extra-1" },
928 { "text": "extra-2" }
929 ])));
930 assert_eq!(
931 result,
932 json!([
933 { "text": "only" },
934 { "text": "extra-1" },
935 { "text": "extra-2" }
936 ])
937 );
938 }
939}
940
941use tracing::Instrument;
942
943pub struct FlowContext<'a> {
944 pub tenant: &'a str,
945 pub flow_id: &'a str,
946 pub node_id: Option<&'a str>,
947 pub tool: Option<&'a str>,
948 pub action: Option<&'a str>,
949 pub session_id: Option<&'a str>,
950 pub provider_id: Option<&'a str>,
951 pub retry_config: RetryConfig,
952 pub observer: Option<&'a dyn ExecutionObserver>,
953 pub mocks: Option<&'a MockLayer>,
954}
955
956#[derive(Copy, Clone)]
957pub struct RetryConfig {
958 pub max_attempts: u32,
959 pub base_delay_ms: u64,
960}
961
962fn should_retry(err: &anyhow::Error) -> bool {
963 let lower = err.to_string().to_lowercase();
964 lower.contains("transient") || lower.contains("unavailable") || lower.contains("internal")
965}
966
967impl From<FlowRetryConfig> for RetryConfig {
968 fn from(value: FlowRetryConfig) -> Self {
969 Self {
970 max_attempts: value.max_attempts.max(1),
971 base_delay_ms: value.base_delay_ms.max(50),
972 }
973 }
974}