1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
9use anyhow::{Context, Result, anyhow, bail};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use crate::config::{FlowRetryConfig, HostConfig};
18use crate::pack::{FlowDescriptor, PackRuntime};
19use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
20use greentic_types::{Flow, Node, NodeId, Routing};
21
22pub struct FlowEngine {
23 packs: Vec<Arc<PackRuntime>>,
24 flows: Vec<FlowDescriptor>,
25 flow_sources: HashMap<String, usize>,
26 flow_cache: RwLock<HashMap<String, HostFlow>>,
27 default_env: String,
28}
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
31pub struct FlowSnapshot {
32 pub flow_id: String,
33 pub next_node: String,
34 pub state: ExecutionState,
35}
36
37#[derive(Clone, Debug)]
38pub struct FlowWait {
39 pub reason: Option<String>,
40 pub snapshot: FlowSnapshot,
41}
42
43#[derive(Clone, Debug)]
44pub enum FlowStatus {
45 Completed,
46 Waiting(FlowWait),
47}
48
49#[derive(Clone, Debug)]
50pub struct FlowExecution {
51 pub output: Value,
52 pub status: FlowStatus,
53}
54
55#[derive(Clone, Debug)]
56struct HostFlow {
57 id: String,
58 start: Option<NodeId>,
59 nodes: IndexMap<NodeId, HostNode>,
60}
61
62#[derive(Clone, Debug)]
63pub struct HostNode {
64 kind: NodeKind,
65 pub component: String,
67 operation_in_mapping: Option<String>,
68 payload_expr: Value,
69 routing: Routing,
70}
71
72#[derive(Clone, Debug)]
73enum NodeKind {
74 Exec { target_component: String },
75 PackComponent { component_ref: String },
76 ProviderInvoke,
77 FlowCall,
78 BuiltinEmit { kind: EmitKind },
79 Wait,
80}
81
82#[derive(Clone, Debug)]
83enum EmitKind {
84 Log,
85 Response,
86 Other(String),
87}
88
89impl FlowExecution {
90 fn completed(output: Value) -> Self {
91 Self {
92 output,
93 status: FlowStatus::Completed,
94 }
95 }
96
97 fn waiting(output: Value, wait: FlowWait) -> Self {
98 Self {
99 output,
100 status: FlowStatus::Waiting(wait),
101 }
102 }
103}
104
105impl FlowEngine {
106 pub async fn new(packs: Vec<Arc<PackRuntime>>, _config: Arc<HostConfig>) -> Result<Self> {
107 let mut flow_sources = HashMap::new();
108 let mut descriptors = Vec::new();
109 for (idx, pack) in packs.iter().enumerate() {
110 let flows = pack.list_flows().await?;
111 for flow in flows {
112 tracing::info!(
113 flow_id = %flow.id,
114 flow_type = %flow.flow_type,
115 pack_index = idx,
116 "registered flow"
117 );
118 flow_sources.insert(flow.id.clone(), idx);
119 descriptors.retain(|existing: &FlowDescriptor| existing.id != flow.id);
120 descriptors.push(flow);
121 }
122 }
123
124 let mut flow_map = HashMap::new();
125 for flow in &descriptors {
126 if let Some(&pack_idx) = flow_sources.get(&flow.id) {
127 let pack_clone = Arc::clone(&packs[pack_idx]);
128 let flow_id = flow.id.clone();
129 let task_flow_id = flow_id.clone();
130 match task::spawn_blocking(move || pack_clone.load_flow(&task_flow_id)).await {
131 Ok(Ok(flow)) => {
132 flow_map.insert(flow_id, HostFlow::from(flow));
133 }
134 Ok(Err(err)) => {
135 tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
136 }
137 Err(err) => {
138 tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
139 }
140 }
141 }
142 }
143
144 Ok(Self {
145 packs,
146 flows: descriptors,
147 flow_sources,
148 flow_cache: RwLock::new(flow_map),
149 default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
150 })
151 }
152
153 async fn get_or_load_flow(&self, flow_id: &str) -> Result<HostFlow> {
154 if let Some(flow) = self.flow_cache.read().get(flow_id).cloned() {
155 return Ok(flow);
156 }
157
158 let pack_idx = *self
159 .flow_sources
160 .get(flow_id)
161 .with_context(|| format!("flow {flow_id} not registered"))?;
162 let pack = Arc::clone(&self.packs[pack_idx]);
163 let flow_id_owned = flow_id.to_string();
164 let task_flow_id = flow_id_owned.clone();
165 let flow = task::spawn_blocking(move || pack.load_flow(&task_flow_id))
166 .await
167 .context("failed to join flow metadata task")??;
168 let host_flow = HostFlow::from(flow);
169 self.flow_cache
170 .write()
171 .insert(flow_id_owned.clone(), host_flow.clone());
172 Ok(host_flow)
173 }
174
175 pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
176 let span = tracing::info_span!(
177 "flow.execute",
178 tenant = tracing::field::Empty,
179 flow_id = tracing::field::Empty,
180 node_id = tracing::field::Empty,
181 tool = tracing::field::Empty,
182 action = tracing::field::Empty
183 );
184 annotate_span(
185 &span,
186 &FlowSpanAttributes {
187 tenant: ctx.tenant,
188 flow_id: ctx.flow_id,
189 node_id: ctx.node_id,
190 tool: ctx.tool,
191 action: ctx.action,
192 },
193 );
194 set_flow_context(
195 &self.default_env,
196 ctx.tenant,
197 ctx.flow_id,
198 ctx.node_id,
199 ctx.provider_id,
200 ctx.session_id,
201 );
202 let retry_config = ctx.retry_config;
203 let original_input = input;
204 async move {
205 let mut attempt = 0u32;
206 loop {
207 attempt += 1;
208 match self.execute_once(&ctx, original_input.clone()).await {
209 Ok(value) => return Ok(value),
210 Err(err) => {
211 if attempt >= retry_config.max_attempts || !should_retry(&err) {
212 return Err(err);
213 }
214 let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
215 tracing::warn!(
216 tenant = ctx.tenant,
217 flow_id = ctx.flow_id,
218 attempt,
219 max_attempts = retry_config.max_attempts,
220 delay_ms = delay,
221 error = %err,
222 "transient flow execution failure, backing off"
223 );
224 tokio::time::sleep(Duration::from_millis(delay)).await;
225 }
226 }
227 }
228 }
229 .instrument(span)
230 .await
231 }
232
233 pub async fn resume(
234 &self,
235 ctx: FlowContext<'_>,
236 snapshot: FlowSnapshot,
237 input: Value,
238 ) -> Result<FlowExecution> {
239 if snapshot.flow_id != ctx.flow_id {
240 bail!(
241 "snapshot flow {} does not match requested {}",
242 snapshot.flow_id,
243 ctx.flow_id
244 );
245 }
246 let flow_ir = self.get_or_load_flow(ctx.flow_id).await?;
247 let mut state = snapshot.state;
248 state.replace_input(input);
249 self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
250 .await
251 }
252
253 async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
254 let flow_ir = self.get_or_load_flow(ctx.flow_id).await?;
255 let state = ExecutionState::new(input);
256 self.drive_flow(ctx, flow_ir, state, None).await
257 }
258
259 async fn drive_flow(
260 &self,
261 ctx: &FlowContext<'_>,
262 flow_ir: HostFlow,
263 mut state: ExecutionState,
264 resume_from: Option<String>,
265 ) -> Result<FlowExecution> {
266 let mut current = match resume_from {
267 Some(node) => NodeId::from_str(&node)
268 .with_context(|| format!("invalid resume node id `{node}`"))?,
269 None => flow_ir
270 .start
271 .clone()
272 .or_else(|| flow_ir.nodes.keys().next().cloned())
273 .with_context(|| format!("flow {} has no start node", flow_ir.id))?,
274 };
275
276 loop {
277 let node = flow_ir
278 .nodes
279 .get(¤t)
280 .with_context(|| format!("node {} not found", current.as_str()))?;
281
282 let payload = node.payload_expr.clone();
283 let observed_payload = payload.clone();
284 let node_id = current.clone();
285 let event = NodeEvent {
286 context: ctx,
287 node_id: node_id.as_str(),
288 node,
289 payload: &observed_payload,
290 };
291 if let Some(observer) = ctx.observer {
292 observer.on_node_start(&event);
293 }
294 let DispatchOutcome {
295 output,
296 wait_reason,
297 } = self
298 .dispatch_node(ctx, node_id.as_str(), node, &mut state, payload)
299 .await?;
300
301 state.nodes.insert(node_id.clone().into(), output.clone());
302
303 let (next, should_exit) = match &node.routing {
304 Routing::Next { node_id } => (Some(node_id.clone()), false),
305 Routing::End | Routing::Reply => (None, true),
306 Routing::Branch { default, .. } => (default.clone(), default.is_none()),
307 Routing::Custom(raw) => {
308 tracing::warn!(
309 flow_id = %flow_ir.id,
310 node_id = %node_id,
311 routing = ?raw,
312 "unsupported routing; terminating flow"
313 );
314 (None, true)
315 }
316 };
317
318 if let Some(wait_reason) = wait_reason {
319 let resume_target = next.clone().ok_or_else(|| {
320 anyhow!(
321 "session.wait node {} requires a non-empty route",
322 current.as_str()
323 )
324 })?;
325 let mut snapshot_state = state.clone();
326 snapshot_state.clear_egress();
327 let snapshot = FlowSnapshot {
328 flow_id: ctx.flow_id.to_string(),
329 next_node: resume_target.as_str().to_string(),
330 state: snapshot_state,
331 };
332 let output_value = state.clone().finalize_with(None);
333 return Ok(FlowExecution::waiting(
334 output_value,
335 FlowWait {
336 reason: Some(wait_reason),
337 snapshot,
338 },
339 ));
340 }
341
342 if should_exit {
343 return Ok(FlowExecution::completed(
344 state.finalize_with(Some(output.payload.clone())),
345 ));
346 }
347
348 match next {
349 Some(n) => current = n,
350 None => {
351 return Ok(FlowExecution::completed(
352 state.finalize_with(Some(output.payload.clone())),
353 ));
354 }
355 }
356 }
357 }
358
359 async fn dispatch_node(
360 &self,
361 ctx: &FlowContext<'_>,
362 node_id: &str,
363 node: &HostNode,
364 state: &mut ExecutionState,
365 payload: Value,
366 ) -> Result<DispatchOutcome> {
367 match &node.kind {
368 NodeKind::Exec { target_component } => self
369 .execute_component_exec(
370 ctx,
371 node_id,
372 node,
373 state,
374 payload,
375 Some(target_component.as_str()),
376 )
377 .await
378 .map(DispatchOutcome::complete),
379 NodeKind::PackComponent { component_ref } => self
380 .execute_component_exec(
381 ctx,
382 node_id,
383 node,
384 state,
385 payload,
386 Some(component_ref.as_str()),
387 )
388 .await
389 .map(DispatchOutcome::complete),
390 NodeKind::FlowCall => self
391 .execute_flow_call(ctx, payload)
392 .await
393 .map(DispatchOutcome::complete),
394 NodeKind::ProviderInvoke => self
395 .execute_provider_invoke(ctx, node_id, state, payload)
396 .await
397 .map(DispatchOutcome::complete),
398 NodeKind::BuiltinEmit { kind } => {
399 match kind {
400 EmitKind::Log | EmitKind::Response => {}
401 EmitKind::Other(component) => {
402 tracing::debug!(%component, "handling emit.* as builtin");
403 }
404 }
405 state.push_egress(payload.clone());
406 Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
407 }
408 NodeKind::Wait => {
409 let reason = extract_wait_reason(&payload);
410 Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
411 }
412 }
413 }
414
415 async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
416 #[derive(Deserialize)]
417 struct FlowCallPayload {
418 #[serde(alias = "flow")]
419 flow_id: String,
420 #[serde(default)]
421 input: Value,
422 }
423
424 let call: FlowCallPayload =
425 serde_json::from_value(payload).context("invalid payload for flow.call node")?;
426 if call.flow_id.trim().is_empty() {
427 bail!("flow.call requires a non-empty flow_id");
428 }
429
430 let sub_input = if call.input.is_null() {
431 Value::Null
432 } else {
433 call.input
434 };
435
436 let flow_id_owned = call.flow_id;
437 let action = "flow.call";
438 let sub_ctx = FlowContext {
439 tenant: ctx.tenant,
440 flow_id: flow_id_owned.as_str(),
441 node_id: None,
442 tool: ctx.tool,
443 action: Some(action),
444 session_id: ctx.session_id,
445 provider_id: ctx.provider_id,
446 retry_config: ctx.retry_config,
447 observer: ctx.observer,
448 mocks: ctx.mocks,
449 };
450
451 let execution = Box::pin(self.execute(sub_ctx, sub_input))
452 .await
453 .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
454 match execution.status {
455 FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
456 FlowStatus::Waiting(wait) => bail!(
457 "flow.call cannot pause (flow {} waiting {:?})",
458 flow_id_owned,
459 wait.reason
460 ),
461 }
462 }
463
464 async fn execute_component_exec(
465 &self,
466 ctx: &FlowContext<'_>,
467 node_id: &str,
468 node: &HostNode,
469 state: &ExecutionState,
470 payload: Value,
471 component_override: Option<&str>,
472 ) -> Result<NodeOutput> {
473 #[derive(Deserialize)]
474 struct ComponentPayload {
475 #[serde(default, alias = "component_ref", alias = "component")]
476 component: Option<String>,
477 #[serde(alias = "op")]
478 operation: Option<String>,
479 #[serde(default)]
480 input: Value,
481 #[serde(default)]
482 config: Value,
483 }
484
485 let payload: ComponentPayload =
486 serde_json::from_value(payload).context("invalid payload for component.exec")?;
487 let component_ref = component_override
488 .map(str::to_string)
489 .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
490 .with_context(|| "component.exec requires a component_ref")?;
491 let component_label = node.component.as_str();
492 let operation = match payload.operation.filter(|v| !v.trim().is_empty()) {
493 Some(op) => op,
494 None => {
495 let mut message = format!(
496 "missing operation for node `{}` (component `{}`); expected node.component.operation to be set",
497 node_id, component_label,
498 );
499 if let Some(found) = &node.operation_in_mapping {
500 message.push_str(&format!(
501 ". Found operation in input.mapping (`{}`) but this is not used; pack compiler must preserve node.component.operation.",
502 found
503 ));
504 }
505 bail!(message);
506 }
507 };
508 let mut input = payload.input;
509 if let Value::Object(mut map) = input {
510 map.entry("state".to_string())
511 .or_insert_with(|| state.context());
512 input = Value::Object(map);
513 }
514 let input_json = serde_json::to_string(&input)?;
515 let config_json = if payload.config.is_null() {
516 None
517 } else {
518 Some(serde_json::to_string(&payload.config)?)
519 };
520
521 let pack_idx = *self
522 .flow_sources
523 .get(ctx.flow_id)
524 .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
525 let pack = Arc::clone(&self.packs[pack_idx]);
526 let exec_ctx = component_exec_ctx(ctx, node_id);
527 let value = pack
528 .invoke_component(
529 &component_ref,
530 exec_ctx,
531 &operation,
532 config_json,
533 input_json,
534 )
535 .await?;
536
537 Ok(NodeOutput::new(value))
538 }
539
540 async fn execute_provider_invoke(
541 &self,
542 ctx: &FlowContext<'_>,
543 node_id: &str,
544 state: &ExecutionState,
545 payload: Value,
546 ) -> Result<NodeOutput> {
547 #[derive(Deserialize)]
548 struct ProviderPayload {
549 #[serde(default)]
550 provider_id: Option<String>,
551 #[serde(default)]
552 provider_type: Option<String>,
553 #[serde(default, alias = "operation")]
554 op: Option<String>,
555 #[serde(default)]
556 input: Value,
557 #[serde(default)]
558 in_map: Value,
559 #[serde(default)]
560 out_map: Value,
561 #[serde(default)]
562 err_map: Value,
563 }
564
565 let payload: ProviderPayload =
566 serde_json::from_value(payload).context("invalid payload for provider.invoke")?;
567 let op = payload
568 .op
569 .as_deref()
570 .filter(|v| !v.trim().is_empty())
571 .with_context(|| "provider.invoke requires an op")?
572 .to_string();
573
574 let mut input_value = if !payload.in_map.is_null() {
575 let ctx_value = mapping_ctx(payload.input.clone(), state);
576 apply_mapping(&payload.in_map, &ctx_value)
577 .context("failed to evaluate provider.invoke in_map")?
578 } else if !payload.input.is_null() {
579 payload.input
580 } else {
581 Value::Null
582 };
583
584 if let Value::Object(ref mut map) = input_value {
585 map.entry("state".to_string())
586 .or_insert_with(|| state.context());
587 }
588 let input_json = serde_json::to_vec(&input_value)?;
589
590 let pack_idx = *self
591 .flow_sources
592 .get(ctx.flow_id)
593 .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
594 let pack = Arc::clone(&self.packs[pack_idx]);
595 let binding = pack.resolve_provider(
596 payload.provider_id.as_deref(),
597 payload.provider_type.as_deref(),
598 )?;
599 let exec_ctx = component_exec_ctx(ctx, node_id);
600 let result = pack
601 .invoke_provider(&binding, exec_ctx, &op, input_json)
602 .await?;
603
604 let output = if payload.out_map.is_null() {
605 result
606 } else {
607 let ctx_value = mapping_ctx(result, state);
608 apply_mapping(&payload.out_map, &ctx_value)
609 .context("failed to evaluate provider.invoke out_map")?
610 };
611 let _ = payload.err_map;
612 Ok(NodeOutput::new(output))
613 }
614
615 pub fn flows(&self) -> &[FlowDescriptor] {
616 &self.flows
617 }
618
619 pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
620 self.flows
621 .iter()
622 .find(|descriptor| descriptor.flow_type == flow_type)
623 }
624
625 pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
626 self.flows
627 .iter()
628 .find(|descriptor| descriptor.id == flow_id)
629 }
630}
631
632pub trait ExecutionObserver: Send + Sync {
633 fn on_node_start(&self, event: &NodeEvent<'_>);
634 fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
635 fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
636}
637
638pub struct NodeEvent<'a> {
639 pub context: &'a FlowContext<'a>,
640 pub node_id: &'a str,
641 pub node: &'a HostNode,
642 pub payload: &'a Value,
643}
644
645#[derive(Clone, Debug, Serialize, Deserialize)]
646pub struct ExecutionState {
647 input: Value,
648 nodes: HashMap<String, NodeOutput>,
649 egress: Vec<Value>,
650}
651
652impl ExecutionState {
653 fn new(input: Value) -> Self {
654 Self {
655 input,
656 nodes: HashMap::new(),
657 egress: Vec::new(),
658 }
659 }
660
661 fn context(&self) -> Value {
662 let mut nodes = JsonMap::new();
663 for (id, output) in &self.nodes {
664 nodes.insert(
665 id.clone(),
666 json!({
667 "ok": output.ok,
668 "payload": output.payload.clone(),
669 "meta": output.meta.clone(),
670 }),
671 );
672 }
673 json!({
674 "input": self.input.clone(),
675 "nodes": nodes,
676 })
677 }
678 fn push_egress(&mut self, payload: Value) {
679 self.egress.push(payload);
680 }
681
682 fn replace_input(&mut self, input: Value) {
683 self.input = input;
684 }
685
686 fn clear_egress(&mut self) {
687 self.egress.clear();
688 }
689
690 fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
691 if self.egress.is_empty() {
692 return final_payload.unwrap_or(Value::Null);
693 }
694 let mut emitted = std::mem::take(&mut self.egress);
695 if let Some(value) = final_payload {
696 match value {
697 Value::Null => {}
698 Value::Array(items) => emitted.extend(items),
699 other => emitted.push(other),
700 }
701 }
702 Value::Array(emitted)
703 }
704}
705
706#[derive(Clone, Debug, Serialize, Deserialize)]
707struct NodeOutput {
708 ok: bool,
709 payload: Value,
710 meta: Value,
711}
712
713impl NodeOutput {
714 fn new(payload: Value) -> Self {
715 Self {
716 ok: true,
717 payload,
718 meta: Value::Null,
719 }
720 }
721}
722
723struct DispatchOutcome {
724 output: NodeOutput,
725 wait_reason: Option<String>,
726}
727
728impl DispatchOutcome {
729 fn complete(output: NodeOutput) -> Self {
730 Self {
731 output,
732 wait_reason: None,
733 }
734 }
735
736 fn wait(output: NodeOutput, reason: Option<String>) -> Self {
737 Self {
738 output,
739 wait_reason: reason,
740 }
741 }
742}
743
744fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
745 ComponentExecCtx {
746 tenant: ComponentTenantCtx {
747 tenant: ctx.tenant.to_string(),
748 team: None,
749 user: ctx.provider_id.map(str::to_string),
750 trace_id: None,
751 correlation_id: ctx.session_id.map(str::to_string),
752 deadline_unix_ms: None,
753 attempt: 1,
754 idempotency_key: ctx.session_id.map(str::to_string),
755 },
756 flow_id: ctx.flow_id.to_string(),
757 node_id: Some(node_id.to_string()),
758 }
759}
760
761fn extract_wait_reason(payload: &Value) -> Option<String> {
762 match payload {
763 Value::String(s) => Some(s.clone()),
764 Value::Object(map) => map
765 .get("reason")
766 .and_then(Value::as_str)
767 .map(|value| value.to_string()),
768 _ => None,
769 }
770}
771
772fn mapping_ctx(root: Value, state: &ExecutionState) -> Value {
773 json!({
774 "input": root.clone(),
775 "result": root,
776 "state": state.context(),
777 })
778}
779
780fn apply_mapping(template: &Value, ctx: &Value) -> Result<Value> {
781 match template {
782 Value::String(path) if path.starts_with('/') => ctx
783 .pointer(path)
784 .cloned()
785 .ok_or_else(|| anyhow!("mapping path `{path}` not found")),
786 Value::Array(items) => {
787 let mut mapped = Vec::with_capacity(items.len());
788 for item in items {
789 mapped.push(apply_mapping(item, ctx)?);
790 }
791 Ok(Value::Array(mapped))
792 }
793 Value::Object(map) => {
794 let mut out = serde_json::Map::new();
795 for (key, value) in map {
796 out.insert(key.clone(), apply_mapping(value, ctx)?);
797 }
798 Ok(Value::Object(out))
799 }
800 other => Ok(other.clone()),
801 }
802}
803
804impl From<Flow> for HostFlow {
805 fn from(value: Flow) -> Self {
806 let mut nodes = IndexMap::new();
807 for (id, node) in value.nodes {
808 nodes.insert(id.clone(), HostNode::from(node));
809 }
810 let start = value
811 .entrypoints
812 .get("default")
813 .and_then(Value::as_str)
814 .and_then(|id| NodeId::from_str(id).ok())
815 .or_else(|| nodes.keys().next().cloned());
816 Self {
817 id: value.id.as_str().to_string(),
818 start,
819 nodes,
820 }
821 }
822}
823
824impl From<Node> for HostNode {
825 fn from(node: Node) -> Self {
826 let component_ref = node.component.id.as_str().to_string();
827 let operation_in_mapping = extract_operation_from_mapping(&node.input.mapping);
828 let kind = match component_ref.as_str() {
829 "component.exec" => {
830 let target = extract_target_component(&node.input.mapping)
831 .unwrap_or_else(|| "component.exec".to_string());
832 if target.starts_with("emit.") {
833 NodeKind::BuiltinEmit {
834 kind: emit_kind_from_ref(&target),
835 }
836 } else {
837 NodeKind::Exec {
838 target_component: target,
839 }
840 }
841 }
842 "flow.call" => NodeKind::FlowCall,
843 "provider.invoke" => NodeKind::ProviderInvoke,
844 "session.wait" => NodeKind::Wait,
845 comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
846 kind: emit_kind_from_ref(comp),
847 },
848 other => NodeKind::PackComponent {
849 component_ref: other.to_string(),
850 },
851 };
852 let component_label = match &kind {
853 NodeKind::Exec { .. } => "component.exec".to_string(),
854 NodeKind::PackComponent { component_ref } => component_ref.clone(),
855 NodeKind::ProviderInvoke => "provider.invoke".to_string(),
856 NodeKind::FlowCall => "flow.call".to_string(),
857 NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
858 NodeKind::Wait => "session.wait".to_string(),
859 };
860 let payload_expr = match kind {
861 NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.input.mapping),
862 _ => node.input.mapping.clone(),
863 };
864 let payload_expr = match &kind {
865 NodeKind::Exec { .. } | NodeKind::PackComponent { .. } => {
868 merge_operation(payload_expr, node.component.operation.as_deref())
869 }
870 _ => payload_expr,
871 };
872 Self {
873 kind,
874 component: component_label,
875 operation_in_mapping,
876 payload_expr,
877 routing: node.routing,
878 }
879 }
880}
881
882fn extract_target_component(payload: &Value) -> Option<String> {
883 match payload {
884 Value::Object(map) => map
885 .get("component")
886 .or_else(|| map.get("component_ref"))
887 .and_then(Value::as_str)
888 .map(|s| s.to_string()),
889 _ => None,
890 }
891}
892
893fn extract_operation_from_mapping(payload: &Value) -> Option<String> {
894 match payload {
895 Value::Object(map) => map
896 .get("operation")
897 .or_else(|| map.get("op"))
898 .and_then(Value::as_str)
899 .map(str::trim)
900 .filter(|value| !value.is_empty())
901 .map(|value| value.to_string()),
902 _ => None,
903 }
904}
905
906fn extract_emit_payload(payload: &Value) -> Value {
907 if let Value::Object(map) = payload {
908 if let Some(input) = map.get("input") {
909 return input.clone();
910 }
911 if let Some(inner) = map.get("payload") {
912 return inner.clone();
913 }
914 }
915 payload.clone()
916}
917
918fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
919 match component_ref {
920 "emit.log" => EmitKind::Log,
921 "emit.response" => EmitKind::Response,
922 other => EmitKind::Other(other.to_string()),
923 }
924}
925
926fn emit_ref_from_kind(kind: &EmitKind) -> String {
927 match kind {
928 EmitKind::Log => "emit.log".to_string(),
929 EmitKind::Response => "emit.response".to_string(),
930 EmitKind::Other(other) => other.clone(),
931 }
932}
933
934fn merge_operation(payload: Value, operation: Option<&str>) -> Value {
935 let Some(op) = operation else { return payload };
936 match payload {
937 Value::Object(mut map) => {
938 let set_op = match map.get("operation") {
939 Some(Value::String(existing)) => existing.trim().is_empty(),
940 None => true,
941 _ => true,
942 };
943 if set_op {
944 map.insert("operation".into(), Value::String(op.to_string()));
945 }
946 Value::Object(map)
947 }
948 Value::Null => {
949 let mut map = JsonMap::new();
950 map.insert("operation".into(), Value::String(op.to_string()));
951 Value::Object(map)
952 }
953 other => other,
954 }
955}
956
957#[cfg(test)]
958mod tests {
959 use super::*;
960 use serde_json::json;
961 use tokio::runtime::Runtime;
962
963 fn minimal_engine() -> FlowEngine {
964 FlowEngine {
965 packs: Vec::new(),
966 flows: Vec::new(),
967 flow_sources: HashMap::new(),
968 flow_cache: RwLock::new(HashMap::new()),
969 default_env: "local".to_string(),
970 }
971 }
972
973 #[test]
974 fn templating_renders_with_partials_and_data() {
975 let mut state = ExecutionState::new(json!({ "city": "London" }));
976 state.nodes.insert(
977 "forecast".to_string(),
978 NodeOutput::new(json!({ "temp": "20C" })),
979 );
980
981 let ctx = state.context();
983 assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
984 }
985
986 #[test]
987 fn finalize_wraps_emitted_payloads() {
988 let mut state = ExecutionState::new(json!({}));
989 state.push_egress(json!({ "text": "first" }));
990 state.push_egress(json!({ "text": "second" }));
991 let result = state.finalize_with(Some(json!({ "text": "final" })));
992 assert_eq!(
993 result,
994 json!([
995 { "text": "first" },
996 { "text": "second" },
997 { "text": "final" }
998 ])
999 );
1000 }
1001
1002 #[test]
1003 fn finalize_flattens_final_array() {
1004 let mut state = ExecutionState::new(json!({}));
1005 state.push_egress(json!({ "text": "only" }));
1006 let result = state.finalize_with(Some(json!([
1007 { "text": "extra-1" },
1008 { "text": "extra-2" }
1009 ])));
1010 assert_eq!(
1011 result,
1012 json!([
1013 { "text": "only" },
1014 { "text": "extra-1" },
1015 { "text": "extra-2" }
1016 ])
1017 );
1018 }
1019
1020 #[test]
1021 fn missing_operation_reports_node_and_component() {
1022 let engine = minimal_engine();
1023 let rt = Runtime::new().unwrap();
1024 let retry_config = RetryConfig {
1025 max_attempts: 1,
1026 base_delay_ms: 1,
1027 };
1028 let ctx = FlowContext {
1029 tenant: "tenant",
1030 flow_id: "flow",
1031 node_id: Some("missing-op"),
1032 tool: None,
1033 action: None,
1034 session_id: None,
1035 provider_id: None,
1036 retry_config,
1037 observer: None,
1038 mocks: None,
1039 };
1040 let node = HostNode {
1041 kind: NodeKind::Exec {
1042 target_component: "qa.process".into(),
1043 },
1044 component: "component.exec".into(),
1045 operation_in_mapping: None,
1046 payload_expr: Value::Null,
1047 routing: Routing::End,
1048 };
1049 let state = ExecutionState::new(Value::Null);
1050 let payload = json!({ "component": "qa.process" });
1051 let err = rt
1052 .block_on(engine.execute_component_exec(
1053 &ctx,
1054 "missing-op",
1055 &node,
1056 &state,
1057 payload,
1058 None,
1059 ))
1060 .unwrap_err();
1061 let message = err.to_string();
1062 assert!(
1063 message.contains("missing operation for node `missing-op`"),
1064 "unexpected message: {message}"
1065 );
1066 assert!(
1067 message.contains("(component `component.exec`)"),
1068 "unexpected message: {message}"
1069 );
1070 }
1071
1072 #[test]
1073 fn missing_operation_mentions_mapping_hint() {
1074 let engine = minimal_engine();
1075 let rt = Runtime::new().unwrap();
1076 let retry_config = RetryConfig {
1077 max_attempts: 1,
1078 base_delay_ms: 1,
1079 };
1080 let ctx = FlowContext {
1081 tenant: "tenant",
1082 flow_id: "flow",
1083 node_id: Some("missing-op-hint"),
1084 tool: None,
1085 action: None,
1086 session_id: None,
1087 provider_id: None,
1088 retry_config,
1089 observer: None,
1090 mocks: None,
1091 };
1092 let node = HostNode {
1093 kind: NodeKind::Exec {
1094 target_component: "qa.process".into(),
1095 },
1096 component: "component.exec".into(),
1097 operation_in_mapping: Some("render".into()),
1098 payload_expr: Value::Null,
1099 routing: Routing::End,
1100 };
1101 let state = ExecutionState::new(Value::Null);
1102 let payload = json!({ "component": "qa.process" });
1103 let err = rt
1104 .block_on(engine.execute_component_exec(
1105 &ctx,
1106 "missing-op-hint",
1107 &node,
1108 &state,
1109 payload,
1110 None,
1111 ))
1112 .unwrap_err();
1113 let message = err.to_string();
1114 assert!(
1115 message.contains("missing operation for node `missing-op-hint`"),
1116 "unexpected message: {message}"
1117 );
1118 assert!(
1119 message.contains("Found operation in input.mapping (`render`)"),
1120 "unexpected message: {message}"
1121 );
1122 }
1123}
1124
1125use tracing::Instrument;
1126
1127pub struct FlowContext<'a> {
1128 pub tenant: &'a str,
1129 pub flow_id: &'a str,
1130 pub node_id: Option<&'a str>,
1131 pub tool: Option<&'a str>,
1132 pub action: Option<&'a str>,
1133 pub session_id: Option<&'a str>,
1134 pub provider_id: Option<&'a str>,
1135 pub retry_config: RetryConfig,
1136 pub observer: Option<&'a dyn ExecutionObserver>,
1137 pub mocks: Option<&'a MockLayer>,
1138}
1139
1140#[derive(Copy, Clone)]
1141pub struct RetryConfig {
1142 pub max_attempts: u32,
1143 pub base_delay_ms: u64,
1144}
1145
1146fn should_retry(err: &anyhow::Error) -> bool {
1147 let lower = err.to_string().to_lowercase();
1148 lower.contains("transient") || lower.contains("unavailable") || lower.contains("internal")
1149}
1150
1151impl From<FlowRetryConfig> for RetryConfig {
1152 fn from(value: FlowRetryConfig) -> Self {
1153 Self {
1154 max_attempts: value.max_attempts.max(1),
1155 base_delay_ms: value.base_delay_ms.max(50),
1156 }
1157 }
1158}