1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
9use anyhow::{Context, Result, anyhow, bail};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use crate::config::{FlowRetryConfig, HostConfig};
18use crate::pack::{FlowDescriptor, PackRuntime};
19use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
20use greentic_types::{Flow, Node, NodeId, Routing};
21
22pub struct FlowEngine {
23 packs: Vec<Arc<PackRuntime>>,
24 flows: Vec<FlowDescriptor>,
25 flow_sources: HashMap<String, usize>,
26 flow_cache: RwLock<HashMap<String, HostFlow>>,
27 default_env: String,
28}
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
31pub struct FlowSnapshot {
32 pub flow_id: String,
33 pub next_node: String,
34 pub state: ExecutionState,
35}
36
37#[derive(Clone, Debug)]
38pub struct FlowWait {
39 pub reason: Option<String>,
40 pub snapshot: FlowSnapshot,
41}
42
43#[derive(Clone, Debug)]
44pub enum FlowStatus {
45 Completed,
46 Waiting(FlowWait),
47}
48
49#[derive(Clone, Debug)]
50pub struct FlowExecution {
51 pub output: Value,
52 pub status: FlowStatus,
53}
54
55#[derive(Clone, Debug)]
56struct HostFlow {
57 id: String,
58 start: Option<NodeId>,
59 nodes: IndexMap<NodeId, HostNode>,
60}
61
62#[derive(Clone, Debug)]
63pub struct HostNode {
64 kind: NodeKind,
65 pub component: String,
67 component_id: String,
68 operation_name: Option<String>,
69 operation_in_mapping: Option<String>,
70 payload_expr: Value,
71 routing: Routing,
72}
73
74#[derive(Clone, Debug)]
75enum NodeKind {
76 Exec { target_component: String },
77 PackComponent { component_ref: String },
78 ProviderInvoke,
79 FlowCall,
80 BuiltinEmit { kind: EmitKind },
81 Wait,
82}
83
84#[derive(Clone, Debug)]
85enum EmitKind {
86 Log,
87 Response,
88 Other(String),
89}
90
91struct ComponentOverrides<'a> {
92 component: Option<&'a str>,
93 operation: Option<&'a str>,
94}
95
96struct ComponentCall {
97 component_ref: String,
98 operation: String,
99 input: Value,
100 config: Value,
101}
102
103impl FlowExecution {
104 fn completed(output: Value) -> Self {
105 Self {
106 output,
107 status: FlowStatus::Completed,
108 }
109 }
110
111 fn waiting(output: Value, wait: FlowWait) -> Self {
112 Self {
113 output,
114 status: FlowStatus::Waiting(wait),
115 }
116 }
117}
118
119impl FlowEngine {
120 pub async fn new(packs: Vec<Arc<PackRuntime>>, _config: Arc<HostConfig>) -> Result<Self> {
121 let mut flow_sources = HashMap::new();
122 let mut descriptors = Vec::new();
123 for (idx, pack) in packs.iter().enumerate() {
124 let flows = pack.list_flows().await?;
125 for flow in flows {
126 tracing::info!(
127 flow_id = %flow.id,
128 flow_type = %flow.flow_type,
129 pack_index = idx,
130 "registered flow"
131 );
132 flow_sources.insert(flow.id.clone(), idx);
133 descriptors.retain(|existing: &FlowDescriptor| existing.id != flow.id);
134 descriptors.push(flow);
135 }
136 }
137
138 let mut flow_map = HashMap::new();
139 for flow in &descriptors {
140 if let Some(&pack_idx) = flow_sources.get(&flow.id) {
141 let pack_clone = Arc::clone(&packs[pack_idx]);
142 let flow_id = flow.id.clone();
143 let task_flow_id = flow_id.clone();
144 match task::spawn_blocking(move || pack_clone.load_flow(&task_flow_id)).await {
145 Ok(Ok(flow)) => {
146 flow_map.insert(flow_id, HostFlow::from(flow));
147 }
148 Ok(Err(err)) => {
149 tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
150 }
151 Err(err) => {
152 tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
153 }
154 }
155 }
156 }
157
158 Ok(Self {
159 packs,
160 flows: descriptors,
161 flow_sources,
162 flow_cache: RwLock::new(flow_map),
163 default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
164 })
165 }
166
167 async fn get_or_load_flow(&self, flow_id: &str) -> Result<HostFlow> {
168 if let Some(flow) = self.flow_cache.read().get(flow_id).cloned() {
169 return Ok(flow);
170 }
171
172 let pack_idx = *self
173 .flow_sources
174 .get(flow_id)
175 .with_context(|| format!("flow {flow_id} not registered"))?;
176 let pack = Arc::clone(&self.packs[pack_idx]);
177 let flow_id_owned = flow_id.to_string();
178 let task_flow_id = flow_id_owned.clone();
179 let flow = task::spawn_blocking(move || pack.load_flow(&task_flow_id))
180 .await
181 .context("failed to join flow metadata task")??;
182 let host_flow = HostFlow::from(flow);
183 self.flow_cache
184 .write()
185 .insert(flow_id_owned.clone(), host_flow.clone());
186 Ok(host_flow)
187 }
188
189 pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
190 let span = tracing::info_span!(
191 "flow.execute",
192 tenant = tracing::field::Empty,
193 flow_id = tracing::field::Empty,
194 node_id = tracing::field::Empty,
195 tool = tracing::field::Empty,
196 action = tracing::field::Empty
197 );
198 annotate_span(
199 &span,
200 &FlowSpanAttributes {
201 tenant: ctx.tenant,
202 flow_id: ctx.flow_id,
203 node_id: ctx.node_id,
204 tool: ctx.tool,
205 action: ctx.action,
206 },
207 );
208 set_flow_context(
209 &self.default_env,
210 ctx.tenant,
211 ctx.flow_id,
212 ctx.node_id,
213 ctx.provider_id,
214 ctx.session_id,
215 );
216 let retry_config = ctx.retry_config;
217 let original_input = input;
218 async move {
219 let mut attempt = 0u32;
220 loop {
221 attempt += 1;
222 match self.execute_once(&ctx, original_input.clone()).await {
223 Ok(value) => return Ok(value),
224 Err(err) => {
225 if attempt >= retry_config.max_attempts || !should_retry(&err) {
226 return Err(err);
227 }
228 let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
229 tracing::warn!(
230 tenant = ctx.tenant,
231 flow_id = ctx.flow_id,
232 attempt,
233 max_attempts = retry_config.max_attempts,
234 delay_ms = delay,
235 error = %err,
236 "transient flow execution failure, backing off"
237 );
238 tokio::time::sleep(Duration::from_millis(delay)).await;
239 }
240 }
241 }
242 }
243 .instrument(span)
244 .await
245 }
246
247 pub async fn resume(
248 &self,
249 ctx: FlowContext<'_>,
250 snapshot: FlowSnapshot,
251 input: Value,
252 ) -> Result<FlowExecution> {
253 if snapshot.flow_id != ctx.flow_id {
254 bail!(
255 "snapshot flow {} does not match requested {}",
256 snapshot.flow_id,
257 ctx.flow_id
258 );
259 }
260 let flow_ir = self.get_or_load_flow(ctx.flow_id).await?;
261 let mut state = snapshot.state;
262 state.replace_input(input);
263 self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
264 .await
265 }
266
267 async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
268 let flow_ir = self.get_or_load_flow(ctx.flow_id).await?;
269 let state = ExecutionState::new(input);
270 self.drive_flow(ctx, flow_ir, state, None).await
271 }
272
273 async fn drive_flow(
274 &self,
275 ctx: &FlowContext<'_>,
276 flow_ir: HostFlow,
277 mut state: ExecutionState,
278 resume_from: Option<String>,
279 ) -> Result<FlowExecution> {
280 let mut current = match resume_from {
281 Some(node) => NodeId::from_str(&node)
282 .with_context(|| format!("invalid resume node id `{node}`"))?,
283 None => flow_ir
284 .start
285 .clone()
286 .or_else(|| flow_ir.nodes.keys().next().cloned())
287 .with_context(|| format!("flow {} has no start node", flow_ir.id))?,
288 };
289
290 loop {
291 let node = flow_ir
292 .nodes
293 .get(¤t)
294 .with_context(|| format!("node {} not found", current.as_str()))?;
295
296 let payload = node.payload_expr.clone();
297 let observed_payload = payload.clone();
298 let node_id = current.clone();
299 let event = NodeEvent {
300 context: ctx,
301 node_id: node_id.as_str(),
302 node,
303 payload: &observed_payload,
304 };
305 if let Some(observer) = ctx.observer {
306 observer.on_node_start(&event);
307 }
308 let DispatchOutcome {
309 output,
310 wait_reason,
311 } = self
312 .dispatch_node(ctx, node_id.as_str(), node, &mut state, payload)
313 .await?;
314
315 state.nodes.insert(node_id.clone().into(), output.clone());
316
317 let (next, should_exit) = match &node.routing {
318 Routing::Next { node_id } => (Some(node_id.clone()), false),
319 Routing::End | Routing::Reply => (None, true),
320 Routing::Branch { default, .. } => (default.clone(), default.is_none()),
321 Routing::Custom(raw) => {
322 tracing::warn!(
323 flow_id = %flow_ir.id,
324 node_id = %node_id,
325 routing = ?raw,
326 "unsupported routing; terminating flow"
327 );
328 (None, true)
329 }
330 };
331
332 if let Some(wait_reason) = wait_reason {
333 let resume_target = next.clone().ok_or_else(|| {
334 anyhow!(
335 "session.wait node {} requires a non-empty route",
336 current.as_str()
337 )
338 })?;
339 let mut snapshot_state = state.clone();
340 snapshot_state.clear_egress();
341 let snapshot = FlowSnapshot {
342 flow_id: ctx.flow_id.to_string(),
343 next_node: resume_target.as_str().to_string(),
344 state: snapshot_state,
345 };
346 let output_value = state.clone().finalize_with(None);
347 return Ok(FlowExecution::waiting(
348 output_value,
349 FlowWait {
350 reason: Some(wait_reason),
351 snapshot,
352 },
353 ));
354 }
355
356 if should_exit {
357 return Ok(FlowExecution::completed(
358 state.finalize_with(Some(output.payload.clone())),
359 ));
360 }
361
362 match next {
363 Some(n) => current = n,
364 None => {
365 return Ok(FlowExecution::completed(
366 state.finalize_with(Some(output.payload.clone())),
367 ));
368 }
369 }
370 }
371 }
372
373 async fn dispatch_node(
374 &self,
375 ctx: &FlowContext<'_>,
376 node_id: &str,
377 node: &HostNode,
378 state: &mut ExecutionState,
379 payload: Value,
380 ) -> Result<DispatchOutcome> {
381 match &node.kind {
382 NodeKind::Exec { target_component } => self
383 .execute_component_exec(
384 ctx,
385 node_id,
386 node,
387 state,
388 payload,
389 ComponentOverrides {
390 component: Some(target_component.as_str()),
391 operation: node.operation_name.as_deref(),
392 },
393 )
394 .await
395 .map(DispatchOutcome::complete),
396 NodeKind::PackComponent { component_ref } => self
397 .execute_component_call(ctx, node_id, node, state, payload, component_ref.as_str())
398 .await
399 .map(DispatchOutcome::complete),
400 NodeKind::FlowCall => self
401 .execute_flow_call(ctx, payload)
402 .await
403 .map(DispatchOutcome::complete),
404 NodeKind::ProviderInvoke => self
405 .execute_provider_invoke(ctx, node_id, state, payload)
406 .await
407 .map(DispatchOutcome::complete),
408 NodeKind::BuiltinEmit { kind } => {
409 match kind {
410 EmitKind::Log | EmitKind::Response => {}
411 EmitKind::Other(component) => {
412 tracing::debug!(%component, "handling emit.* as builtin");
413 }
414 }
415 state.push_egress(payload.clone());
416 Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
417 }
418 NodeKind::Wait => {
419 let reason = extract_wait_reason(&payload);
420 Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
421 }
422 }
423 }
424
425 async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
426 #[derive(Deserialize)]
427 struct FlowCallPayload {
428 #[serde(alias = "flow")]
429 flow_id: String,
430 #[serde(default)]
431 input: Value,
432 }
433
434 let call: FlowCallPayload =
435 serde_json::from_value(payload).context("invalid payload for flow.call node")?;
436 if call.flow_id.trim().is_empty() {
437 bail!("flow.call requires a non-empty flow_id");
438 }
439
440 let sub_input = if call.input.is_null() {
441 Value::Null
442 } else {
443 call.input
444 };
445
446 let flow_id_owned = call.flow_id;
447 let action = "flow.call";
448 let sub_ctx = FlowContext {
449 tenant: ctx.tenant,
450 flow_id: flow_id_owned.as_str(),
451 node_id: None,
452 tool: ctx.tool,
453 action: Some(action),
454 session_id: ctx.session_id,
455 provider_id: ctx.provider_id,
456 retry_config: ctx.retry_config,
457 observer: ctx.observer,
458 mocks: ctx.mocks,
459 };
460
461 let execution = Box::pin(self.execute(sub_ctx, sub_input))
462 .await
463 .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
464 match execution.status {
465 FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
466 FlowStatus::Waiting(wait) => bail!(
467 "flow.call cannot pause (flow {} waiting {:?})",
468 flow_id_owned,
469 wait.reason
470 ),
471 }
472 }
473
474 async fn execute_component_exec(
475 &self,
476 ctx: &FlowContext<'_>,
477 node_id: &str,
478 node: &HostNode,
479 state: &ExecutionState,
480 payload: Value,
481 overrides: ComponentOverrides<'_>,
482 ) -> Result<NodeOutput> {
483 #[derive(Deserialize)]
484 struct ComponentPayload {
485 #[serde(default, alias = "component_ref", alias = "component")]
486 component: Option<String>,
487 #[serde(alias = "op")]
488 operation: Option<String>,
489 #[serde(default)]
490 input: Value,
491 #[serde(default)]
492 config: Value,
493 }
494
495 let payload: ComponentPayload =
496 serde_json::from_value(payload).context("invalid payload for component.exec")?;
497 let component_ref = overrides
498 .component
499 .map(str::to_string)
500 .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
501 .with_context(|| "component.exec requires a component_ref")?;
502 let operation = resolve_component_operation(
503 node_id,
504 node.component_id.as_str(),
505 payload.operation,
506 overrides.operation,
507 node.operation_in_mapping.as_deref(),
508 )?;
509 let call = ComponentCall {
510 component_ref,
511 operation,
512 input: payload.input,
513 config: payload.config,
514 };
515
516 self.invoke_component_call(ctx, node_id, state, call).await
517 }
518
519 async fn execute_component_call(
520 &self,
521 ctx: &FlowContext<'_>,
522 node_id: &str,
523 node: &HostNode,
524 state: &ExecutionState,
525 payload: Value,
526 component_ref: &str,
527 ) -> Result<NodeOutput> {
528 let payload_operation = extract_operation_from_mapping(&payload);
529 let (input, config) = split_operation_payload(payload);
530 let operation = resolve_component_operation(
531 node_id,
532 node.component_id.as_str(),
533 payload_operation,
534 node.operation_name.as_deref(),
535 node.operation_in_mapping.as_deref(),
536 )?;
537 let call = ComponentCall {
538 component_ref: component_ref.to_string(),
539 operation,
540 input,
541 config,
542 };
543 self.invoke_component_call(ctx, node_id, state, call).await
544 }
545
546 async fn invoke_component_call(
547 &self,
548 ctx: &FlowContext<'_>,
549 node_id: &str,
550 state: &ExecutionState,
551 mut call: ComponentCall,
552 ) -> Result<NodeOutput> {
553 if let Value::Object(ref mut map) = call.input {
554 map.entry("state".to_string())
555 .or_insert_with(|| state.context());
556 }
557 let input_json = serde_json::to_string(&call.input)?;
558 let config_json = if call.config.is_null() {
559 None
560 } else {
561 Some(serde_json::to_string(&call.config)?)
562 };
563
564 let pack_idx = *self
565 .flow_sources
566 .get(ctx.flow_id)
567 .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
568 let pack = Arc::clone(&self.packs[pack_idx]);
569 let exec_ctx = component_exec_ctx(ctx, node_id);
570 let value = pack
571 .invoke_component(
572 call.component_ref.as_str(),
573 exec_ctx,
574 call.operation.as_str(),
575 config_json,
576 input_json,
577 )
578 .await?;
579
580 Ok(NodeOutput::new(value))
581 }
582
583 async fn execute_provider_invoke(
584 &self,
585 ctx: &FlowContext<'_>,
586 node_id: &str,
587 state: &ExecutionState,
588 payload: Value,
589 ) -> Result<NodeOutput> {
590 #[derive(Deserialize)]
591 struct ProviderPayload {
592 #[serde(default)]
593 provider_id: Option<String>,
594 #[serde(default)]
595 provider_type: Option<String>,
596 #[serde(default, alias = "operation")]
597 op: Option<String>,
598 #[serde(default)]
599 input: Value,
600 #[serde(default)]
601 in_map: Value,
602 #[serde(default)]
603 out_map: Value,
604 #[serde(default)]
605 err_map: Value,
606 }
607
608 let payload: ProviderPayload =
609 serde_json::from_value(payload).context("invalid payload for provider.invoke")?;
610 let op = payload
611 .op
612 .as_deref()
613 .filter(|v| !v.trim().is_empty())
614 .with_context(|| "provider.invoke requires an op")?
615 .to_string();
616
617 let mut input_value = if !payload.in_map.is_null() {
618 let ctx_value = mapping_ctx(payload.input.clone(), state);
619 apply_mapping(&payload.in_map, &ctx_value)
620 .context("failed to evaluate provider.invoke in_map")?
621 } else if !payload.input.is_null() {
622 payload.input
623 } else {
624 Value::Null
625 };
626
627 if let Value::Object(ref mut map) = input_value {
628 map.entry("state".to_string())
629 .or_insert_with(|| state.context());
630 }
631 let input_json = serde_json::to_vec(&input_value)?;
632
633 let pack_idx = *self
634 .flow_sources
635 .get(ctx.flow_id)
636 .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
637 let pack = Arc::clone(&self.packs[pack_idx]);
638 let binding = pack.resolve_provider(
639 payload.provider_id.as_deref(),
640 payload.provider_type.as_deref(),
641 )?;
642 let exec_ctx = component_exec_ctx(ctx, node_id);
643 let result = pack
644 .invoke_provider(&binding, exec_ctx, &op, input_json)
645 .await?;
646
647 let output = if payload.out_map.is_null() {
648 result
649 } else {
650 let ctx_value = mapping_ctx(result, state);
651 apply_mapping(&payload.out_map, &ctx_value)
652 .context("failed to evaluate provider.invoke out_map")?
653 };
654 let _ = payload.err_map;
655 Ok(NodeOutput::new(output))
656 }
657
658 pub fn flows(&self) -> &[FlowDescriptor] {
659 &self.flows
660 }
661
662 pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
663 self.flows
664 .iter()
665 .find(|descriptor| descriptor.flow_type == flow_type)
666 }
667
668 pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
669 self.flows
670 .iter()
671 .find(|descriptor| descriptor.id == flow_id)
672 }
673}
674
675pub trait ExecutionObserver: Send + Sync {
676 fn on_node_start(&self, event: &NodeEvent<'_>);
677 fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
678 fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
679}
680
681pub struct NodeEvent<'a> {
682 pub context: &'a FlowContext<'a>,
683 pub node_id: &'a str,
684 pub node: &'a HostNode,
685 pub payload: &'a Value,
686}
687
688#[derive(Clone, Debug, Serialize, Deserialize)]
689pub struct ExecutionState {
690 input: Value,
691 nodes: HashMap<String, NodeOutput>,
692 egress: Vec<Value>,
693}
694
695impl ExecutionState {
696 fn new(input: Value) -> Self {
697 Self {
698 input,
699 nodes: HashMap::new(),
700 egress: Vec::new(),
701 }
702 }
703
704 fn context(&self) -> Value {
705 let mut nodes = JsonMap::new();
706 for (id, output) in &self.nodes {
707 nodes.insert(
708 id.clone(),
709 json!({
710 "ok": output.ok,
711 "payload": output.payload.clone(),
712 "meta": output.meta.clone(),
713 }),
714 );
715 }
716 json!({
717 "input": self.input.clone(),
718 "nodes": nodes,
719 })
720 }
721 fn push_egress(&mut self, payload: Value) {
722 self.egress.push(payload);
723 }
724
725 fn replace_input(&mut self, input: Value) {
726 self.input = input;
727 }
728
729 fn clear_egress(&mut self) {
730 self.egress.clear();
731 }
732
733 fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
734 if self.egress.is_empty() {
735 return final_payload.unwrap_or(Value::Null);
736 }
737 let mut emitted = std::mem::take(&mut self.egress);
738 if let Some(value) = final_payload {
739 match value {
740 Value::Null => {}
741 Value::Array(items) => emitted.extend(items),
742 other => emitted.push(other),
743 }
744 }
745 Value::Array(emitted)
746 }
747}
748
749#[derive(Clone, Debug, Serialize, Deserialize)]
750struct NodeOutput {
751 ok: bool,
752 payload: Value,
753 meta: Value,
754}
755
756impl NodeOutput {
757 fn new(payload: Value) -> Self {
758 Self {
759 ok: true,
760 payload,
761 meta: Value::Null,
762 }
763 }
764}
765
766struct DispatchOutcome {
767 output: NodeOutput,
768 wait_reason: Option<String>,
769}
770
771impl DispatchOutcome {
772 fn complete(output: NodeOutput) -> Self {
773 Self {
774 output,
775 wait_reason: None,
776 }
777 }
778
779 fn wait(output: NodeOutput, reason: Option<String>) -> Self {
780 Self {
781 output,
782 wait_reason: reason,
783 }
784 }
785}
786
787fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
788 ComponentExecCtx {
789 tenant: ComponentTenantCtx {
790 tenant: ctx.tenant.to_string(),
791 team: None,
792 user: ctx.provider_id.map(str::to_string),
793 trace_id: None,
794 correlation_id: ctx.session_id.map(str::to_string),
795 deadline_unix_ms: None,
796 attempt: 1,
797 idempotency_key: ctx.session_id.map(str::to_string),
798 },
799 flow_id: ctx.flow_id.to_string(),
800 node_id: Some(node_id.to_string()),
801 }
802}
803
804fn extract_wait_reason(payload: &Value) -> Option<String> {
805 match payload {
806 Value::String(s) => Some(s.clone()),
807 Value::Object(map) => map
808 .get("reason")
809 .and_then(Value::as_str)
810 .map(|value| value.to_string()),
811 _ => None,
812 }
813}
814
815fn mapping_ctx(root: Value, state: &ExecutionState) -> Value {
816 json!({
817 "input": root.clone(),
818 "result": root,
819 "state": state.context(),
820 })
821}
822
823fn apply_mapping(template: &Value, ctx: &Value) -> Result<Value> {
824 match template {
825 Value::String(path) if path.starts_with('/') => ctx
826 .pointer(path)
827 .cloned()
828 .ok_or_else(|| anyhow!("mapping path `{path}` not found")),
829 Value::Array(items) => {
830 let mut mapped = Vec::with_capacity(items.len());
831 for item in items {
832 mapped.push(apply_mapping(item, ctx)?);
833 }
834 Ok(Value::Array(mapped))
835 }
836 Value::Object(map) => {
837 let mut out = serde_json::Map::new();
838 for (key, value) in map {
839 out.insert(key.clone(), apply_mapping(value, ctx)?);
840 }
841 Ok(Value::Object(out))
842 }
843 other => Ok(other.clone()),
844 }
845}
846
847impl From<Flow> for HostFlow {
848 fn from(value: Flow) -> Self {
849 let mut nodes = IndexMap::new();
850 for (id, node) in value.nodes {
851 nodes.insert(id.clone(), HostNode::from(node));
852 }
853 let start = value
854 .entrypoints
855 .get("default")
856 .and_then(Value::as_str)
857 .and_then(|id| NodeId::from_str(id).ok())
858 .or_else(|| nodes.keys().next().cloned());
859 Self {
860 id: value.id.as_str().to_string(),
861 start,
862 nodes,
863 }
864 }
865}
866
867impl From<Node> for HostNode {
868 fn from(node: Node) -> Self {
869 let component_ref = node.component.id.as_str().to_string();
870 let raw_operation = node.component.operation.clone();
871 let operation_in_mapping = extract_operation_from_mapping(&node.input.mapping);
872 let operation_is_component_exec = raw_operation.as_deref() == Some("component.exec");
873 let operation_is_emit = raw_operation
874 .as_deref()
875 .map(|op| op.starts_with("emit."))
876 .unwrap_or(false);
877 let is_component_exec = component_ref == "component.exec" || operation_is_component_exec;
878
879 let kind = if is_component_exec {
880 let target = if component_ref == "component.exec" {
881 if let Some(op) = raw_operation
882 .as_deref()
883 .filter(|op| op.starts_with("emit."))
884 {
885 op.to_string()
886 } else {
887 extract_target_component(&node.input.mapping)
888 .unwrap_or_else(|| "component.exec".to_string())
889 }
890 } else {
891 extract_target_component(&node.input.mapping)
892 .unwrap_or_else(|| component_ref.clone())
893 };
894 if target.starts_with("emit.") {
895 NodeKind::BuiltinEmit {
896 kind: emit_kind_from_ref(&target),
897 }
898 } else {
899 NodeKind::Exec {
900 target_component: target,
901 }
902 }
903 } else if operation_is_emit {
904 NodeKind::BuiltinEmit {
905 kind: emit_kind_from_ref(raw_operation.as_deref().unwrap_or("emit.log")),
906 }
907 } else {
908 match component_ref.as_str() {
909 "flow.call" => NodeKind::FlowCall,
910 "provider.invoke" => NodeKind::ProviderInvoke,
911 "session.wait" => NodeKind::Wait,
912 comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
913 kind: emit_kind_from_ref(comp),
914 },
915 other => NodeKind::PackComponent {
916 component_ref: other.to_string(),
917 },
918 }
919 };
920 let component_label = match &kind {
921 NodeKind::Exec { .. } => "component.exec".to_string(),
922 NodeKind::PackComponent { component_ref } => component_ref.clone(),
923 NodeKind::ProviderInvoke => "provider.invoke".to_string(),
924 NodeKind::FlowCall => "flow.call".to_string(),
925 NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
926 NodeKind::Wait => "session.wait".to_string(),
927 };
928 let operation_name = if is_component_exec && operation_is_component_exec {
929 None
930 } else {
931 raw_operation.clone()
932 };
933 let payload_expr = match kind {
934 NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.input.mapping),
935 _ => node.input.mapping.clone(),
936 };
937 Self {
938 kind,
939 component: component_label,
940 component_id: if is_component_exec {
941 "component.exec".to_string()
942 } else {
943 component_ref
944 },
945 operation_name,
946 operation_in_mapping,
947 payload_expr,
948 routing: node.routing,
949 }
950 }
951}
952
953fn extract_target_component(payload: &Value) -> Option<String> {
954 match payload {
955 Value::Object(map) => map
956 .get("component")
957 .or_else(|| map.get("component_ref"))
958 .and_then(Value::as_str)
959 .map(|s| s.to_string()),
960 _ => None,
961 }
962}
963
964fn extract_operation_from_mapping(payload: &Value) -> Option<String> {
965 match payload {
966 Value::Object(map) => map
967 .get("operation")
968 .or_else(|| map.get("op"))
969 .and_then(Value::as_str)
970 .map(str::trim)
971 .filter(|value| !value.is_empty())
972 .map(|value| value.to_string()),
973 _ => None,
974 }
975}
976
977fn extract_emit_payload(payload: &Value) -> Value {
978 if let Value::Object(map) = payload {
979 if let Some(input) = map.get("input") {
980 return input.clone();
981 }
982 if let Some(inner) = map.get("payload") {
983 return inner.clone();
984 }
985 }
986 payload.clone()
987}
988
989fn split_operation_payload(payload: Value) -> (Value, Value) {
990 if let Value::Object(mut map) = payload.clone()
991 && map.contains_key("input")
992 {
993 let input = map.remove("input").unwrap_or(Value::Null);
994 let config = map.remove("config").unwrap_or(Value::Null);
995 let legacy_only = map.keys().all(|key| {
996 matches!(
997 key.as_str(),
998 "operation" | "op" | "component" | "component_ref"
999 )
1000 });
1001 if legacy_only {
1002 return (input, config);
1003 }
1004 }
1005 (payload, Value::Null)
1006}
1007
1008fn resolve_component_operation(
1009 node_id: &str,
1010 component_label: &str,
1011 payload_operation: Option<String>,
1012 operation_override: Option<&str>,
1013 operation_in_mapping: Option<&str>,
1014) -> Result<String> {
1015 if let Some(op) = operation_override
1016 .map(str::trim)
1017 .filter(|value| !value.is_empty())
1018 {
1019 return Ok(op.to_string());
1020 }
1021
1022 if let Some(op) = payload_operation
1023 .as_deref()
1024 .map(str::trim)
1025 .filter(|value| !value.is_empty())
1026 {
1027 return Ok(op.to_string());
1028 }
1029
1030 let mut message = format!(
1031 "missing operation for node `{}` (component `{}`); expected node.component.operation to be set",
1032 node_id, component_label,
1033 );
1034 if let Some(found) = operation_in_mapping {
1035 message.push_str(&format!(
1036 ". Found operation in input.mapping (`{}`) but this is not used; pack compiler must preserve node.component.operation.",
1037 found
1038 ));
1039 }
1040 bail!(message);
1041}
1042
1043fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
1044 match component_ref {
1045 "emit.log" => EmitKind::Log,
1046 "emit.response" => EmitKind::Response,
1047 other => EmitKind::Other(other.to_string()),
1048 }
1049}
1050
1051fn emit_ref_from_kind(kind: &EmitKind) -> String {
1052 match kind {
1053 EmitKind::Log => "emit.log".to_string(),
1054 EmitKind::Response => "emit.response".to_string(),
1055 EmitKind::Other(other) => other.clone(),
1056 }
1057}
1058
1059#[cfg(test)]
1060mod tests {
1061 use super::*;
1062 use serde_json::json;
1063 use tokio::runtime::Runtime;
1064
1065 fn minimal_engine() -> FlowEngine {
1066 FlowEngine {
1067 packs: Vec::new(),
1068 flows: Vec::new(),
1069 flow_sources: HashMap::new(),
1070 flow_cache: RwLock::new(HashMap::new()),
1071 default_env: "local".to_string(),
1072 }
1073 }
1074
1075 #[test]
1076 fn templating_renders_with_partials_and_data() {
1077 let mut state = ExecutionState::new(json!({ "city": "London" }));
1078 state.nodes.insert(
1079 "forecast".to_string(),
1080 NodeOutput::new(json!({ "temp": "20C" })),
1081 );
1082
1083 let ctx = state.context();
1085 assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
1086 }
1087
1088 #[test]
1089 fn finalize_wraps_emitted_payloads() {
1090 let mut state = ExecutionState::new(json!({}));
1091 state.push_egress(json!({ "text": "first" }));
1092 state.push_egress(json!({ "text": "second" }));
1093 let result = state.finalize_with(Some(json!({ "text": "final" })));
1094 assert_eq!(
1095 result,
1096 json!([
1097 { "text": "first" },
1098 { "text": "second" },
1099 { "text": "final" }
1100 ])
1101 );
1102 }
1103
1104 #[test]
1105 fn finalize_flattens_final_array() {
1106 let mut state = ExecutionState::new(json!({}));
1107 state.push_egress(json!({ "text": "only" }));
1108 let result = state.finalize_with(Some(json!([
1109 { "text": "extra-1" },
1110 { "text": "extra-2" }
1111 ])));
1112 assert_eq!(
1113 result,
1114 json!([
1115 { "text": "only" },
1116 { "text": "extra-1" },
1117 { "text": "extra-2" }
1118 ])
1119 );
1120 }
1121
1122 #[test]
1123 fn missing_operation_reports_node_and_component() {
1124 let engine = minimal_engine();
1125 let rt = Runtime::new().unwrap();
1126 let retry_config = RetryConfig {
1127 max_attempts: 1,
1128 base_delay_ms: 1,
1129 };
1130 let ctx = FlowContext {
1131 tenant: "tenant",
1132 flow_id: "flow",
1133 node_id: Some("missing-op"),
1134 tool: None,
1135 action: None,
1136 session_id: None,
1137 provider_id: None,
1138 retry_config,
1139 observer: None,
1140 mocks: None,
1141 };
1142 let node = HostNode {
1143 kind: NodeKind::Exec {
1144 target_component: "qa.process".into(),
1145 },
1146 component: "component.exec".into(),
1147 component_id: "component.exec".into(),
1148 operation_name: None,
1149 operation_in_mapping: None,
1150 payload_expr: Value::Null,
1151 routing: Routing::End,
1152 };
1153 let state = ExecutionState::new(Value::Null);
1154 let payload = json!({ "component": "qa.process" });
1155 let err = rt
1156 .block_on(engine.execute_component_exec(
1157 &ctx,
1158 "missing-op",
1159 &node,
1160 &state,
1161 payload,
1162 ComponentOverrides {
1163 component: None,
1164 operation: None,
1165 },
1166 ))
1167 .unwrap_err();
1168 let message = err.to_string();
1169 assert!(
1170 message.contains("missing operation for node `missing-op`"),
1171 "unexpected message: {message}"
1172 );
1173 assert!(
1174 message.contains("(component `component.exec`)"),
1175 "unexpected message: {message}"
1176 );
1177 }
1178
1179 #[test]
1180 fn missing_operation_mentions_mapping_hint() {
1181 let engine = minimal_engine();
1182 let rt = Runtime::new().unwrap();
1183 let retry_config = RetryConfig {
1184 max_attempts: 1,
1185 base_delay_ms: 1,
1186 };
1187 let ctx = FlowContext {
1188 tenant: "tenant",
1189 flow_id: "flow",
1190 node_id: Some("missing-op-hint"),
1191 tool: None,
1192 action: None,
1193 session_id: None,
1194 provider_id: None,
1195 retry_config,
1196 observer: None,
1197 mocks: None,
1198 };
1199 let node = HostNode {
1200 kind: NodeKind::Exec {
1201 target_component: "qa.process".into(),
1202 },
1203 component: "component.exec".into(),
1204 component_id: "component.exec".into(),
1205 operation_name: None,
1206 operation_in_mapping: Some("render".into()),
1207 payload_expr: Value::Null,
1208 routing: Routing::End,
1209 };
1210 let state = ExecutionState::new(Value::Null);
1211 let payload = json!({ "component": "qa.process" });
1212 let err = rt
1213 .block_on(engine.execute_component_exec(
1214 &ctx,
1215 "missing-op-hint",
1216 &node,
1217 &state,
1218 payload,
1219 ComponentOverrides {
1220 component: None,
1221 operation: None,
1222 },
1223 ))
1224 .unwrap_err();
1225 let message = err.to_string();
1226 assert!(
1227 message.contains("missing operation for node `missing-op-hint`"),
1228 "unexpected message: {message}"
1229 );
1230 assert!(
1231 message.contains("Found operation in input.mapping (`render`)"),
1232 "unexpected message: {message}"
1233 );
1234 }
1235}
1236
1237use tracing::Instrument;
1238
1239pub struct FlowContext<'a> {
1240 pub tenant: &'a str,
1241 pub flow_id: &'a str,
1242 pub node_id: Option<&'a str>,
1243 pub tool: Option<&'a str>,
1244 pub action: Option<&'a str>,
1245 pub session_id: Option<&'a str>,
1246 pub provider_id: Option<&'a str>,
1247 pub retry_config: RetryConfig,
1248 pub observer: Option<&'a dyn ExecutionObserver>,
1249 pub mocks: Option<&'a MockLayer>,
1250}
1251
1252#[derive(Copy, Clone)]
1253pub struct RetryConfig {
1254 pub max_attempts: u32,
1255 pub base_delay_ms: u64,
1256}
1257
1258fn should_retry(err: &anyhow::Error) -> bool {
1259 let lower = err.to_string().to_lowercase();
1260 lower.contains("transient") || lower.contains("unavailable") || lower.contains("internal")
1261}
1262
1263impl From<FlowRetryConfig> for RetryConfig {
1264 fn from(value: FlowRetryConfig) -> Self {
1265 Self {
1266 max_attempts: value.max_attempts.max(1),
1267 base_delay_ms: value.base_delay_ms.max(50),
1268 }
1269 }
1270}