1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
9use anyhow::{Context, Result, anyhow, bail};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use super::templating::{TemplateOptions, render_template_value};
18use crate::config::{FlowRetryConfig, HostConfig};
19use crate::pack::{FlowDescriptor, PackRuntime};
20use crate::runner::invocation::{InvocationMeta, build_invocation_envelope};
21use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
22#[cfg(feature = "fault-injection")]
23use crate::testing::fault_injection::{FaultContext, FaultPoint, maybe_fail};
24use crate::validate::{
25 ValidationConfig, ValidationIssue, ValidationMode, validate_component_envelope,
26 validate_tool_envelope,
27};
28use greentic_types::{Flow, Node, NodeId, Routing};
29
30pub struct FlowEngine {
31 packs: Vec<Arc<PackRuntime>>,
32 flows: Vec<FlowDescriptor>,
33 flow_sources: HashMap<FlowKey, usize>,
34 flow_cache: RwLock<HashMap<FlowKey, HostFlow>>,
35 default_env: String,
36 validation: ValidationConfig,
37}
38
39#[derive(Clone, Debug, PartialEq, Eq, Hash)]
40struct FlowKey {
41 pack_id: String,
42 flow_id: String,
43}
44
45#[derive(Clone, Debug, Serialize, Deserialize)]
46pub struct FlowSnapshot {
47 pub pack_id: String,
48 pub flow_id: String,
49 pub next_node: String,
50 pub state: ExecutionState,
51}
52
53#[derive(Clone, Debug)]
54pub struct FlowWait {
55 pub reason: Option<String>,
56 pub snapshot: FlowSnapshot,
57}
58
59#[derive(Clone, Debug)]
60pub enum FlowStatus {
61 Completed,
62 Waiting(Box<FlowWait>),
63}
64
65#[derive(Clone, Debug)]
66pub struct FlowExecution {
67 pub output: Value,
68 pub status: FlowStatus,
69}
70
71#[derive(Clone, Debug)]
72struct HostFlow {
73 id: String,
74 start: Option<NodeId>,
75 nodes: IndexMap<NodeId, HostNode>,
76}
77
78#[derive(Clone, Debug)]
79pub struct HostNode {
80 kind: NodeKind,
81 pub component: String,
83 component_id: String,
84 operation_name: Option<String>,
85 operation_in_mapping: Option<String>,
86 payload_expr: Value,
87 routing: Routing,
88}
89
90impl HostNode {
91 pub fn component_id(&self) -> &str {
92 &self.component_id
93 }
94
95 pub fn operation_name(&self) -> Option<&str> {
96 self.operation_name.as_deref()
97 }
98
99 pub fn operation_in_mapping(&self) -> Option<&str> {
100 self.operation_in_mapping.as_deref()
101 }
102}
103
104#[derive(Clone, Debug)]
105enum NodeKind {
106 Exec { target_component: String },
107 PackComponent { component_ref: String },
108 ProviderInvoke,
109 FlowCall,
110 BuiltinEmit { kind: EmitKind },
111 Wait,
112}
113
114#[derive(Clone, Debug)]
115enum EmitKind {
116 Log,
117 Response,
118 Other(String),
119}
120
121struct ComponentOverrides<'a> {
122 component: Option<&'a str>,
123 operation: Option<&'a str>,
124}
125
126struct ComponentCall {
127 component_ref: String,
128 operation: String,
129 input: Value,
130 config: Value,
131}
132
133impl FlowExecution {
134 fn completed(output: Value) -> Self {
135 Self {
136 output,
137 status: FlowStatus::Completed,
138 }
139 }
140
141 fn waiting(output: Value, wait: FlowWait) -> Self {
142 Self {
143 output,
144 status: FlowStatus::Waiting(Box::new(wait)),
145 }
146 }
147}
148
149impl FlowEngine {
150 pub async fn new(packs: Vec<Arc<PackRuntime>>, config: Arc<HostConfig>) -> Result<Self> {
151 let mut flow_sources: HashMap<FlowKey, usize> = HashMap::new();
152 let mut descriptors = Vec::new();
153 let mut bindings = HashMap::new();
154 for pack in &config.pack_bindings {
155 bindings.insert(pack.pack_id.clone(), pack.flows.clone());
156 }
157 let enforce_bindings = !bindings.is_empty();
158 for (idx, pack) in packs.iter().enumerate() {
159 let pack_id = pack.metadata().pack_id.clone();
160 if enforce_bindings && !bindings.contains_key(&pack_id) {
161 bail!("no gtbind entries found for pack {}", pack_id);
162 }
163 let flows = pack.list_flows().await?;
164 let allowed = bindings.get(&pack_id).map(|flows| {
165 flows
166 .iter()
167 .cloned()
168 .collect::<std::collections::HashSet<_>>()
169 });
170 let mut seen = std::collections::HashSet::new();
171 for flow in flows {
172 if let Some(ref allow) = allowed
173 && !allow.contains(&flow.id)
174 {
175 continue;
176 }
177 seen.insert(flow.id.clone());
178 tracing::info!(
179 flow_id = %flow.id,
180 flow_type = %flow.flow_type,
181 pack_id = %flow.pack_id,
182 pack_index = idx,
183 "registered flow"
184 );
185 flow_sources.insert(
186 FlowKey {
187 pack_id: flow.pack_id.clone(),
188 flow_id: flow.id.clone(),
189 },
190 idx,
191 );
192 descriptors.retain(|existing: &FlowDescriptor| {
193 !(existing.id == flow.id && existing.pack_id == flow.pack_id)
194 });
195 descriptors.push(flow);
196 }
197 if let Some(allow) = allowed {
198 let missing = allow.difference(&seen).cloned().collect::<Vec<_>>();
199 if !missing.is_empty() {
200 bail!(
201 "gtbind flow ids missing in pack {}: {}",
202 pack_id,
203 missing.join(", ")
204 );
205 }
206 }
207 }
208
209 let mut flow_map = HashMap::new();
210 for flow in &descriptors {
211 let pack_id = flow.pack_id.clone();
212 if let Some(&pack_idx) = flow_sources.get(&FlowKey {
213 pack_id: pack_id.clone(),
214 flow_id: flow.id.clone(),
215 }) {
216 let pack_clone = Arc::clone(&packs[pack_idx]);
217 let flow_id = flow.id.clone();
218 let task_flow_id = flow_id.clone();
219 match task::spawn_blocking(move || pack_clone.load_flow(&task_flow_id)).await {
220 Ok(Ok(loaded_flow)) => {
221 flow_map.insert(
222 FlowKey {
223 pack_id: pack_id.clone(),
224 flow_id,
225 },
226 HostFlow::from(loaded_flow),
227 );
228 }
229 Ok(Err(err)) => {
230 tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
231 }
232 Err(err) => {
233 tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
234 }
235 }
236 }
237 }
238
239 Ok(Self {
240 packs,
241 flows: descriptors,
242 flow_sources,
243 flow_cache: RwLock::new(flow_map),
244 default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
245 validation: config.validation.clone(),
246 })
247 }
248
249 async fn get_or_load_flow(&self, pack_id: &str, flow_id: &str) -> Result<HostFlow> {
250 let key = FlowKey {
251 pack_id: pack_id.to_string(),
252 flow_id: flow_id.to_string(),
253 };
254 if let Some(flow) = self.flow_cache.read().get(&key).cloned() {
255 return Ok(flow);
256 }
257
258 let pack_idx = *self
259 .flow_sources
260 .get(&key)
261 .with_context(|| format!("flow {pack_id}:{flow_id} not registered"))?;
262 let pack = Arc::clone(&self.packs[pack_idx]);
263 let flow_id_owned = flow_id.to_string();
264 let task_flow_id = flow_id_owned.clone();
265 let flow = task::spawn_blocking(move || pack.load_flow(&task_flow_id))
266 .await
267 .context("failed to join flow metadata task")??;
268 let host_flow = HostFlow::from(flow);
269 self.flow_cache.write().insert(
270 FlowKey {
271 pack_id: pack_id.to_string(),
272 flow_id: flow_id_owned.clone(),
273 },
274 host_flow.clone(),
275 );
276 Ok(host_flow)
277 }
278
279 pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
280 let span = tracing::info_span!(
281 "flow.execute",
282 tenant = tracing::field::Empty,
283 flow_id = tracing::field::Empty,
284 node_id = tracing::field::Empty,
285 tool = tracing::field::Empty,
286 action = tracing::field::Empty
287 );
288 annotate_span(
289 &span,
290 &FlowSpanAttributes {
291 tenant: ctx.tenant,
292 flow_id: ctx.flow_id,
293 node_id: ctx.node_id,
294 tool: ctx.tool,
295 action: ctx.action,
296 },
297 );
298 set_flow_context(
299 &self.default_env,
300 ctx.tenant,
301 ctx.flow_id,
302 ctx.node_id,
303 ctx.provider_id,
304 ctx.session_id,
305 );
306 let retry_config = ctx.retry_config;
307 let original_input = input;
308 let mut ctx = ctx;
309 async move {
310 let mut attempt = 0u32;
311 loop {
312 attempt += 1;
313 ctx.attempt = attempt;
314 #[cfg(feature = "fault-injection")]
315 {
316 let fault_ctx = FaultContext {
317 pack_id: ctx.pack_id,
318 flow_id: ctx.flow_id,
319 node_id: ctx.node_id,
320 attempt: ctx.attempt,
321 };
322 maybe_fail(FaultPoint::Timeout, fault_ctx)
323 .map_err(|err| anyhow!(err.to_string()))?;
324 }
325 match self.execute_once(&ctx, original_input.clone()).await {
326 Ok(value) => return Ok(value),
327 Err(err) => {
328 if attempt >= retry_config.max_attempts || !should_retry(&err) {
329 return Err(err);
330 }
331 let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
332 tracing::warn!(
333 tenant = ctx.tenant,
334 flow_id = ctx.flow_id,
335 attempt,
336 max_attempts = retry_config.max_attempts,
337 delay_ms = delay,
338 error = %err,
339 "transient flow execution failure, backing off"
340 );
341 tokio::time::sleep(Duration::from_millis(delay)).await;
342 }
343 }
344 }
345 }
346 .instrument(span)
347 .await
348 }
349
350 pub async fn resume(
351 &self,
352 ctx: FlowContext<'_>,
353 snapshot: FlowSnapshot,
354 input: Value,
355 ) -> Result<FlowExecution> {
356 if snapshot.pack_id != ctx.pack_id {
357 bail!(
358 "snapshot pack {} does not match requested {}",
359 snapshot.pack_id,
360 ctx.pack_id
361 );
362 }
363 if snapshot.flow_id != ctx.flow_id {
364 bail!(
365 "snapshot flow {} does not match requested {}",
366 snapshot.flow_id,
367 ctx.flow_id
368 );
369 }
370 let flow_ir = self.get_or_load_flow(ctx.pack_id, ctx.flow_id).await?;
371 let mut state = snapshot.state;
372 state.replace_input(input);
373 state.ensure_entry();
374 self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
375 .await
376 }
377
378 async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
379 let flow_ir = self.get_or_load_flow(ctx.pack_id, ctx.flow_id).await?;
380 let state = ExecutionState::new(input);
381 self.drive_flow(ctx, flow_ir, state, None).await
382 }
383
384 async fn drive_flow(
385 &self,
386 ctx: &FlowContext<'_>,
387 flow_ir: HostFlow,
388 mut state: ExecutionState,
389 resume_from: Option<String>,
390 ) -> Result<FlowExecution> {
391 let mut current = match resume_from {
392 Some(node) => NodeId::from_str(&node)
393 .with_context(|| format!("invalid resume node id `{node}`"))?,
394 None => flow_ir
395 .start
396 .clone()
397 .or_else(|| flow_ir.nodes.keys().next().cloned())
398 .with_context(|| format!("flow {} has no start node", flow_ir.id))?,
399 };
400
401 loop {
402 let node = flow_ir
403 .nodes
404 .get(¤t)
405 .with_context(|| format!("node {} not found", current.as_str()))?;
406
407 let payload_template = node.payload_expr.clone();
408 let prev = state
409 .last_output
410 .as_ref()
411 .cloned()
412 .unwrap_or_else(|| Value::Object(JsonMap::new()));
413 let ctx_value = template_context(&state, prev);
414 #[cfg(feature = "fault-injection")]
415 {
416 let fault_ctx = FaultContext {
417 pack_id: ctx.pack_id,
418 flow_id: ctx.flow_id,
419 node_id: Some(current.as_str()),
420 attempt: ctx.attempt,
421 };
422 maybe_fail(FaultPoint::TemplateRender, fault_ctx)
423 .map_err(|err| anyhow!(err.to_string()))?;
424 }
425 let payload =
426 render_template_value(&payload_template, &ctx_value, TemplateOptions::default())
427 .context("failed to render node input template")?;
428 let observed_payload = payload.clone();
429 let node_id = current.clone();
430 let event = NodeEvent {
431 context: ctx,
432 node_id: node_id.as_str(),
433 node,
434 payload: &observed_payload,
435 };
436 if let Some(observer) = ctx.observer {
437 observer.on_node_start(&event);
438 }
439 let dispatch = self
440 .dispatch_node(ctx, node_id.as_str(), node, &mut state, payload, &event)
441 .await;
442 let DispatchOutcome {
443 output,
444 wait_reason,
445 } = match dispatch {
446 Ok(outcome) => outcome,
447 Err(err) => {
448 if let Some(observer) = ctx.observer {
449 observer.on_node_error(&event, err.as_ref());
450 }
451 return Err(err);
452 }
453 };
454
455 state.nodes.insert(node_id.clone().into(), output.clone());
456 state.last_output = Some(output.payload.clone());
457 if let Some(observer) = ctx.observer {
458 observer.on_node_end(&event, &output.payload);
459 }
460
461 let (next, should_exit) = match &node.routing {
462 Routing::Next { node_id } => (Some(node_id.clone()), false),
463 Routing::End | Routing::Reply => (None, true),
464 Routing::Branch { default, .. } => (default.clone(), default.is_none()),
465 Routing::Custom(raw) => {
466 tracing::warn!(
467 flow_id = %flow_ir.id,
468 node_id = %node_id,
469 routing = ?raw,
470 "unsupported routing; terminating flow"
471 );
472 (None, true)
473 }
474 };
475
476 if let Some(wait_reason) = wait_reason {
477 let resume_target = next.clone().ok_or_else(|| {
478 anyhow!(
479 "session.wait node {} requires a non-empty route",
480 current.as_str()
481 )
482 })?;
483 let mut snapshot_state = state.clone();
484 snapshot_state.clear_egress();
485 let snapshot = FlowSnapshot {
486 pack_id: ctx.pack_id.to_string(),
487 flow_id: ctx.flow_id.to_string(),
488 next_node: resume_target.as_str().to_string(),
489 state: snapshot_state,
490 };
491 let output_value = state.clone().finalize_with(None);
492 return Ok(FlowExecution::waiting(
493 output_value,
494 FlowWait {
495 reason: Some(wait_reason),
496 snapshot,
497 },
498 ));
499 }
500
501 if should_exit {
502 return Ok(FlowExecution::completed(
503 state.finalize_with(Some(output.payload.clone())),
504 ));
505 }
506
507 match next {
508 Some(n) => current = n,
509 None => {
510 return Ok(FlowExecution::completed(
511 state.finalize_with(Some(output.payload.clone())),
512 ));
513 }
514 }
515 }
516 }
517
518 async fn dispatch_node(
519 &self,
520 ctx: &FlowContext<'_>,
521 node_id: &str,
522 node: &HostNode,
523 state: &mut ExecutionState,
524 payload: Value,
525 event: &NodeEvent<'_>,
526 ) -> Result<DispatchOutcome> {
527 match &node.kind {
528 NodeKind::Exec { target_component } => self
529 .execute_component_exec(
530 ctx,
531 node_id,
532 node,
533 payload,
534 event,
535 ComponentOverrides {
536 component: Some(target_component.as_str()),
537 operation: node.operation_name.as_deref(),
538 },
539 )
540 .await
541 .map(DispatchOutcome::complete),
542 NodeKind::PackComponent { component_ref } => self
543 .execute_component_call(ctx, node_id, node, payload, component_ref.as_str(), event)
544 .await
545 .map(DispatchOutcome::complete),
546 NodeKind::FlowCall => self
547 .execute_flow_call(ctx, payload)
548 .await
549 .map(DispatchOutcome::complete),
550 NodeKind::ProviderInvoke => self
551 .execute_provider_invoke(ctx, node_id, state, payload, event)
552 .await
553 .map(DispatchOutcome::complete),
554 NodeKind::BuiltinEmit { kind } => {
555 match kind {
556 EmitKind::Log | EmitKind::Response => {}
557 EmitKind::Other(component) => {
558 tracing::debug!(%component, "handling emit.* as builtin");
559 }
560 }
561 state.push_egress(payload.clone());
562 Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
563 }
564 NodeKind::Wait => {
565 let reason = extract_wait_reason(&payload);
566 Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
567 }
568 }
569 }
570
571 async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
572 #[derive(Deserialize)]
573 struct FlowCallPayload {
574 #[serde(alias = "flow")]
575 flow_id: String,
576 #[serde(default)]
577 input: Value,
578 }
579
580 let call: FlowCallPayload =
581 serde_json::from_value(payload).context("invalid payload for flow.call node")?;
582 if call.flow_id.trim().is_empty() {
583 bail!("flow.call requires a non-empty flow_id");
584 }
585
586 let sub_input = if call.input.is_null() {
587 Value::Null
588 } else {
589 call.input
590 };
591
592 let flow_id_owned = call.flow_id;
593 let action = "flow.call";
594 let sub_ctx = FlowContext {
595 tenant: ctx.tenant,
596 pack_id: ctx.pack_id,
597 flow_id: flow_id_owned.as_str(),
598 node_id: None,
599 tool: ctx.tool,
600 action: Some(action),
601 session_id: ctx.session_id,
602 provider_id: ctx.provider_id,
603 retry_config: ctx.retry_config,
604 attempt: ctx.attempt,
605 observer: ctx.observer,
606 mocks: ctx.mocks,
607 };
608
609 let execution = Box::pin(self.execute(sub_ctx, sub_input))
610 .await
611 .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
612 match execution.status {
613 FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
614 FlowStatus::Waiting(wait) => bail!(
615 "flow.call cannot pause (flow {} waiting {:?})",
616 flow_id_owned,
617 wait.reason
618 ),
619 }
620 }
621
622 async fn execute_component_exec(
623 &self,
624 ctx: &FlowContext<'_>,
625 node_id: &str,
626 node: &HostNode,
627 payload: Value,
628 event: &NodeEvent<'_>,
629 overrides: ComponentOverrides<'_>,
630 ) -> Result<NodeOutput> {
631 #[derive(Deserialize)]
632 struct ComponentPayload {
633 #[serde(default, alias = "component_ref", alias = "component")]
634 component: Option<String>,
635 #[serde(alias = "op")]
636 operation: Option<String>,
637 #[serde(default)]
638 input: Value,
639 #[serde(default)]
640 config: Value,
641 }
642
643 let payload: ComponentPayload =
644 serde_json::from_value(payload).context("invalid payload for component.exec")?;
645 let component_ref = overrides
646 .component
647 .map(str::to_string)
648 .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
649 .with_context(|| "component.exec requires a component_ref")?;
650 let operation = resolve_component_operation(
651 node_id,
652 node.component_id.as_str(),
653 payload.operation,
654 overrides.operation,
655 node.operation_in_mapping.as_deref(),
656 )?;
657 let call = ComponentCall {
658 component_ref,
659 operation,
660 input: payload.input,
661 config: payload.config,
662 };
663
664 self.invoke_component_call(ctx, node_id, call, event).await
665 }
666
667 async fn execute_component_call(
668 &self,
669 ctx: &FlowContext<'_>,
670 node_id: &str,
671 node: &HostNode,
672 payload: Value,
673 component_ref: &str,
674 event: &NodeEvent<'_>,
675 ) -> Result<NodeOutput> {
676 let payload_operation = extract_operation_from_mapping(&payload);
677 let (input, config) = split_operation_payload(payload);
678 let operation = resolve_component_operation(
679 node_id,
680 node.component_id.as_str(),
681 payload_operation,
682 node.operation_name.as_deref(),
683 node.operation_in_mapping.as_deref(),
684 )?;
685 let call = ComponentCall {
686 component_ref: component_ref.to_string(),
687 operation,
688 input,
689 config,
690 };
691 self.invoke_component_call(ctx, node_id, call, event).await
692 }
693
694 async fn invoke_component_call(
695 &self,
696 ctx: &FlowContext<'_>,
697 node_id: &str,
698 call: ComponentCall,
699 event: &NodeEvent<'_>,
700 ) -> Result<NodeOutput> {
701 self.validate_component(ctx, event, &call)?;
702 let meta = InvocationMeta {
704 env: &self.default_env,
705 tenant: ctx.tenant,
706 flow_id: ctx.flow_id,
707 node_id: Some(node_id),
708 provider_id: ctx.provider_id,
709 session_id: ctx.session_id,
710 attempt: ctx.attempt,
711 };
712 let invocation_envelope =
713 build_invocation_envelope(meta, call.operation.as_str(), call.input)
714 .context("build invocation envelope for component call")?;
715 let input_json = serde_json::to_string(&invocation_envelope)?;
716 let config_json = if call.config.is_null() {
717 None
718 } else {
719 Some(serde_json::to_string(&call.config)?)
720 };
721
722 let key = FlowKey {
723 pack_id: ctx.pack_id.to_string(),
724 flow_id: ctx.flow_id.to_string(),
725 };
726 let pack_idx = *self.flow_sources.get(&key).with_context(|| {
727 format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
728 })?;
729 let pack = Arc::clone(&self.packs[pack_idx]);
730 let exec_ctx = component_exec_ctx(ctx, node_id);
731 #[cfg(feature = "fault-injection")]
732 {
733 let fault_ctx = FaultContext {
734 pack_id: ctx.pack_id,
735 flow_id: ctx.flow_id,
736 node_id: Some(node_id),
737 attempt: ctx.attempt,
738 };
739 maybe_fail(FaultPoint::BeforeComponentCall, fault_ctx)
740 .map_err(|err| anyhow!(err.to_string()))?;
741 }
742 let value = pack
743 .invoke_component(
744 call.component_ref.as_str(),
745 exec_ctx,
746 call.operation.as_str(),
747 config_json,
748 input_json,
749 )
750 .await?;
751 #[cfg(feature = "fault-injection")]
752 {
753 let fault_ctx = FaultContext {
754 pack_id: ctx.pack_id,
755 flow_id: ctx.flow_id,
756 node_id: Some(node_id),
757 attempt: ctx.attempt,
758 };
759 maybe_fail(FaultPoint::AfterComponentCall, fault_ctx)
760 .map_err(|err| anyhow!(err.to_string()))?;
761 }
762
763 if let Some((code, message)) = component_error(&value) {
764 bail!(
765 "component {} failed: {}: {}",
766 call.component_ref,
767 code,
768 message
769 );
770 }
771 Ok(NodeOutput::new(value))
772 }
773
774 async fn execute_provider_invoke(
775 &self,
776 ctx: &FlowContext<'_>,
777 node_id: &str,
778 state: &ExecutionState,
779 payload: Value,
780 event: &NodeEvent<'_>,
781 ) -> Result<NodeOutput> {
782 #[derive(Deserialize)]
783 struct ProviderPayload {
784 #[serde(default)]
785 provider_id: Option<String>,
786 #[serde(default)]
787 provider_type: Option<String>,
788 #[serde(default, alias = "operation")]
789 op: Option<String>,
790 #[serde(default)]
791 input: Value,
792 #[serde(default)]
793 in_map: Value,
794 #[serde(default)]
795 out_map: Value,
796 #[serde(default)]
797 err_map: Value,
798 }
799
800 let payload: ProviderPayload =
801 serde_json::from_value(payload).context("invalid payload for provider.invoke")?;
802 let op = payload
803 .op
804 .as_deref()
805 .filter(|v| !v.trim().is_empty())
806 .with_context(|| "provider.invoke requires an op")?
807 .to_string();
808
809 let prev = state
810 .last_output
811 .as_ref()
812 .cloned()
813 .unwrap_or_else(|| Value::Object(JsonMap::new()));
814 let base_ctx = template_context(state, prev);
815
816 let input_value = if !payload.in_map.is_null() {
817 let mut ctx_value = base_ctx.clone();
818 if let Value::Object(ref mut map) = ctx_value {
819 map.insert("input".into(), payload.input.clone());
820 map.insert("result".into(), payload.input.clone());
821 }
822 render_template_value(
823 &payload.in_map,
824 &ctx_value,
825 TemplateOptions {
826 allow_pointer: true,
827 },
828 )
829 .context("failed to render provider.invoke in_map")?
830 } else if !payload.input.is_null() {
831 payload.input
832 } else {
833 Value::Null
834 };
835 let input_json = serde_json::to_vec(&input_value)?;
836
837 self.validate_tool(
838 ctx,
839 event,
840 payload.provider_id.as_deref(),
841 payload.provider_type.as_deref(),
842 &op,
843 &input_value,
844 )?;
845
846 let key = FlowKey {
847 pack_id: ctx.pack_id.to_string(),
848 flow_id: ctx.flow_id.to_string(),
849 };
850 let pack_idx = *self.flow_sources.get(&key).with_context(|| {
851 format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
852 })?;
853 let pack = Arc::clone(&self.packs[pack_idx]);
854 let binding = pack.resolve_provider(
855 payload.provider_id.as_deref(),
856 payload.provider_type.as_deref(),
857 )?;
858 let exec_ctx = component_exec_ctx(ctx, node_id);
859 #[cfg(feature = "fault-injection")]
860 {
861 let fault_ctx = FaultContext {
862 pack_id: ctx.pack_id,
863 flow_id: ctx.flow_id,
864 node_id: Some(node_id),
865 attempt: ctx.attempt,
866 };
867 maybe_fail(FaultPoint::BeforeToolCall, fault_ctx)
868 .map_err(|err| anyhow!(err.to_string()))?;
869 }
870 let result = pack
871 .invoke_provider(&binding, exec_ctx, &op, input_json)
872 .await?;
873 #[cfg(feature = "fault-injection")]
874 {
875 let fault_ctx = FaultContext {
876 pack_id: ctx.pack_id,
877 flow_id: ctx.flow_id,
878 node_id: Some(node_id),
879 attempt: ctx.attempt,
880 };
881 maybe_fail(FaultPoint::AfterToolCall, fault_ctx)
882 .map_err(|err| anyhow!(err.to_string()))?;
883 }
884
885 let output = if payload.out_map.is_null() {
886 result
887 } else {
888 let mut ctx_value = base_ctx;
889 if let Value::Object(ref mut map) = ctx_value {
890 map.insert("input".into(), result.clone());
891 map.insert("result".into(), result.clone());
892 }
893 render_template_value(
894 &payload.out_map,
895 &ctx_value,
896 TemplateOptions {
897 allow_pointer: true,
898 },
899 )
900 .context("failed to render provider.invoke out_map")?
901 };
902 let _ = payload.err_map;
903 Ok(NodeOutput::new(output))
904 }
905
906 fn validate_component(
907 &self,
908 ctx: &FlowContext<'_>,
909 event: &NodeEvent<'_>,
910 call: &ComponentCall,
911 ) -> Result<()> {
912 if self.validation.mode == ValidationMode::Off {
913 return Ok(());
914 }
915 let mut metadata = JsonMap::new();
916 metadata.insert("tenant_id".to_string(), json!(ctx.tenant));
917 if let Some(id) = ctx.session_id {
918 metadata.insert("session".to_string(), json!({ "id": id }));
919 }
920 let envelope = json!({
921 "component_id": call.component_ref,
922 "operation": call.operation,
923 "input": call.input,
924 "config": call.config,
925 "metadata": Value::Object(metadata),
926 });
927 let issues = validate_component_envelope(&envelope);
928 self.report_validation(ctx, event, "component", issues)
929 }
930
931 fn validate_tool(
932 &self,
933 ctx: &FlowContext<'_>,
934 event: &NodeEvent<'_>,
935 provider_id: Option<&str>,
936 provider_type: Option<&str>,
937 operation: &str,
938 input: &Value,
939 ) -> Result<()> {
940 if self.validation.mode == ValidationMode::Off {
941 return Ok(());
942 }
943 let tool_id = provider_id.or(provider_type).unwrap_or("provider.invoke");
944 let mut metadata = JsonMap::new();
945 metadata.insert("tenant_id".to_string(), json!(ctx.tenant));
946 if let Some(id) = ctx.session_id {
947 metadata.insert("session".to_string(), json!({ "id": id }));
948 }
949 let envelope = json!({
950 "tool_id": tool_id,
951 "operation": operation,
952 "input": input,
953 "metadata": Value::Object(metadata),
954 });
955 let issues = validate_tool_envelope(&envelope);
956 self.report_validation(ctx, event, "tool", issues)
957 }
958
959 fn report_validation(
960 &self,
961 ctx: &FlowContext<'_>,
962 event: &NodeEvent<'_>,
963 kind: &str,
964 issues: Vec<ValidationIssue>,
965 ) -> Result<()> {
966 if issues.is_empty() {
967 return Ok(());
968 }
969 if let Some(observer) = ctx.observer {
970 observer.on_validation(event, &issues);
971 }
972 match self.validation.mode {
973 ValidationMode::Warn => {
974 tracing::warn!(
975 tenant = ctx.tenant,
976 flow_id = ctx.flow_id,
977 node_id = event.node_id,
978 kind,
979 issues = ?issues,
980 "invocation envelope validation issues"
981 );
982 Ok(())
983 }
984 ValidationMode::Error => {
985 tracing::error!(
986 tenant = ctx.tenant,
987 flow_id = ctx.flow_id,
988 node_id = event.node_id,
989 kind,
990 issues = ?issues,
991 "invocation envelope validation failed"
992 );
993 bail!("invocation_validation_failed");
994 }
995 ValidationMode::Off => Ok(()),
996 }
997 }
998
999 pub fn flows(&self) -> &[FlowDescriptor] {
1000 &self.flows
1001 }
1002
1003 pub fn flow_by_key(&self, pack_id: &str, flow_id: &str) -> Option<&FlowDescriptor> {
1004 self.flows
1005 .iter()
1006 .find(|descriptor| descriptor.pack_id == pack_id && descriptor.id == flow_id)
1007 }
1008
1009 pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
1010 let mut matches = self
1011 .flows
1012 .iter()
1013 .filter(|descriptor| descriptor.flow_type == flow_type);
1014 let first = matches.next()?;
1015 if matches.next().is_some() {
1016 return None;
1017 }
1018 Some(first)
1019 }
1020
1021 pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
1022 let mut matches = self
1023 .flows
1024 .iter()
1025 .filter(|descriptor| descriptor.id == flow_id);
1026 let first = matches.next()?;
1027 if matches.next().is_some() {
1028 return None;
1029 }
1030 Some(first)
1031 }
1032}
1033
1034pub trait ExecutionObserver: Send + Sync {
1035 fn on_node_start(&self, event: &NodeEvent<'_>);
1036 fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
1037 fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
1038 fn on_validation(&self, _event: &NodeEvent<'_>, _issues: &[ValidationIssue]) {}
1039}
1040
1041pub struct NodeEvent<'a> {
1042 pub context: &'a FlowContext<'a>,
1043 pub node_id: &'a str,
1044 pub node: &'a HostNode,
1045 pub payload: &'a Value,
1046}
1047
1048#[derive(Clone, Debug, Serialize, Deserialize)]
1049pub struct ExecutionState {
1050 #[serde(default)]
1051 entry: Value,
1052 #[serde(default)]
1053 input: Value,
1054 #[serde(default)]
1055 nodes: HashMap<String, NodeOutput>,
1056 #[serde(default)]
1057 egress: Vec<Value>,
1058 #[serde(default, skip_serializing_if = "Option::is_none")]
1059 last_output: Option<Value>,
1060}
1061
1062impl ExecutionState {
1063 fn new(input: Value) -> Self {
1064 Self {
1065 entry: input.clone(),
1066 input,
1067 nodes: HashMap::new(),
1068 egress: Vec::new(),
1069 last_output: None,
1070 }
1071 }
1072
1073 fn ensure_entry(&mut self) {
1074 if self.entry.is_null() {
1075 self.entry = self.input.clone();
1076 }
1077 }
1078
1079 fn context(&self) -> Value {
1080 let mut nodes = JsonMap::new();
1081 for (id, output) in &self.nodes {
1082 nodes.insert(
1083 id.clone(),
1084 json!({
1085 "ok": output.ok,
1086 "payload": output.payload.clone(),
1087 "meta": output.meta.clone(),
1088 }),
1089 );
1090 }
1091 json!({
1092 "entry": self.entry.clone(),
1093 "input": self.input.clone(),
1094 "nodes": nodes,
1095 })
1096 }
1097
1098 fn outputs_map(&self) -> JsonMap<String, Value> {
1099 let mut outputs = JsonMap::new();
1100 for (id, output) in &self.nodes {
1101 outputs.insert(id.clone(), output.payload.clone());
1102 }
1103 outputs
1104 }
1105 fn push_egress(&mut self, payload: Value) {
1106 self.egress.push(payload);
1107 }
1108
1109 fn replace_input(&mut self, input: Value) {
1110 self.input = input;
1111 }
1112
1113 fn clear_egress(&mut self) {
1114 self.egress.clear();
1115 }
1116
1117 fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
1118 if self.egress.is_empty() {
1119 return final_payload.unwrap_or(Value::Null);
1120 }
1121 let mut emitted = std::mem::take(&mut self.egress);
1122 if let Some(value) = final_payload {
1123 match value {
1124 Value::Null => {}
1125 Value::Array(items) => emitted.extend(items),
1126 other => emitted.push(other),
1127 }
1128 }
1129 Value::Array(emitted)
1130 }
1131}
1132
1133#[derive(Clone, Debug, Serialize, Deserialize)]
1134struct NodeOutput {
1135 ok: bool,
1136 payload: Value,
1137 meta: Value,
1138}
1139
1140impl NodeOutput {
1141 fn new(payload: Value) -> Self {
1142 Self {
1143 ok: true,
1144 payload,
1145 meta: Value::Null,
1146 }
1147 }
1148}
1149
1150struct DispatchOutcome {
1151 output: NodeOutput,
1152 wait_reason: Option<String>,
1153}
1154
1155impl DispatchOutcome {
1156 fn complete(output: NodeOutput) -> Self {
1157 Self {
1158 output,
1159 wait_reason: None,
1160 }
1161 }
1162
1163 fn wait(output: NodeOutput, reason: Option<String>) -> Self {
1164 Self {
1165 output,
1166 wait_reason: reason,
1167 }
1168 }
1169}
1170
1171fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
1172 ComponentExecCtx {
1173 tenant: ComponentTenantCtx {
1174 tenant: ctx.tenant.to_string(),
1175 team: None,
1176 user: ctx.provider_id.map(str::to_string),
1177 trace_id: None,
1178 correlation_id: ctx.session_id.map(str::to_string),
1179 deadline_unix_ms: None,
1180 attempt: ctx.attempt,
1181 idempotency_key: ctx.session_id.map(str::to_string),
1182 },
1183 flow_id: ctx.flow_id.to_string(),
1184 node_id: Some(node_id.to_string()),
1185 }
1186}
1187
1188fn component_error(value: &Value) -> Option<(String, String)> {
1189 let obj = value.as_object()?;
1190 let ok = obj.get("ok").and_then(Value::as_bool)?;
1191 if ok {
1192 return None;
1193 }
1194 let err = obj.get("error")?.as_object()?;
1195 let code = err
1196 .get("code")
1197 .and_then(Value::as_str)
1198 .unwrap_or("component_error");
1199 let message = err
1200 .get("message")
1201 .and_then(Value::as_str)
1202 .unwrap_or("component reported error");
1203 Some((code.to_string(), message.to_string()))
1204}
1205
1206fn extract_wait_reason(payload: &Value) -> Option<String> {
1207 match payload {
1208 Value::String(s) => Some(s.clone()),
1209 Value::Object(map) => map
1210 .get("reason")
1211 .and_then(Value::as_str)
1212 .map(|value| value.to_string()),
1213 _ => None,
1214 }
1215}
1216
1217fn template_context(state: &ExecutionState, prev: Value) -> Value {
1218 let entry = if state.entry.is_null() {
1219 Value::Object(JsonMap::new())
1220 } else {
1221 state.entry.clone()
1222 };
1223 let mut ctx = JsonMap::new();
1224 ctx.insert("entry".into(), entry);
1225 ctx.insert("prev".into(), prev);
1226 ctx.insert("node".into(), Value::Object(state.outputs_map()));
1227 ctx.insert("state".into(), state.context());
1228 Value::Object(ctx)
1229}
1230
1231impl From<Flow> for HostFlow {
1232 fn from(value: Flow) -> Self {
1233 let mut nodes = IndexMap::new();
1234 for (id, node) in value.nodes {
1235 nodes.insert(id.clone(), HostNode::from(node));
1236 }
1237 let start = value
1238 .entrypoints
1239 .get("default")
1240 .and_then(Value::as_str)
1241 .and_then(|id| NodeId::from_str(id).ok())
1242 .or_else(|| nodes.keys().next().cloned());
1243 Self {
1244 id: value.id.as_str().to_string(),
1245 start,
1246 nodes,
1247 }
1248 }
1249}
1250
1251impl From<Node> for HostNode {
1252 fn from(node: Node) -> Self {
1253 let component_ref = node.component.id.as_str().to_string();
1254 let raw_operation = node.component.operation.clone();
1255 let operation_in_mapping = extract_operation_from_mapping(&node.input.mapping);
1256 let operation_is_component_exec = raw_operation.as_deref() == Some("component.exec");
1257 let operation_is_emit = raw_operation
1258 .as_deref()
1259 .map(|op| op.starts_with("emit."))
1260 .unwrap_or(false);
1261 let is_component_exec = component_ref == "component.exec" || operation_is_component_exec;
1262
1263 let kind = if is_component_exec {
1264 let target = if component_ref == "component.exec" {
1265 if let Some(op) = raw_operation
1266 .as_deref()
1267 .filter(|op| op.starts_with("emit."))
1268 {
1269 op.to_string()
1270 } else {
1271 extract_target_component(&node.input.mapping)
1272 .unwrap_or_else(|| "component.exec".to_string())
1273 }
1274 } else {
1275 extract_target_component(&node.input.mapping)
1276 .unwrap_or_else(|| component_ref.clone())
1277 };
1278 if target.starts_with("emit.") {
1279 NodeKind::BuiltinEmit {
1280 kind: emit_kind_from_ref(&target),
1281 }
1282 } else {
1283 NodeKind::Exec {
1284 target_component: target,
1285 }
1286 }
1287 } else if operation_is_emit {
1288 NodeKind::BuiltinEmit {
1289 kind: emit_kind_from_ref(raw_operation.as_deref().unwrap_or("emit.log")),
1290 }
1291 } else {
1292 match component_ref.as_str() {
1293 "flow.call" => NodeKind::FlowCall,
1294 "provider.invoke" => NodeKind::ProviderInvoke,
1295 "session.wait" => NodeKind::Wait,
1296 comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
1297 kind: emit_kind_from_ref(comp),
1298 },
1299 other => NodeKind::PackComponent {
1300 component_ref: other.to_string(),
1301 },
1302 }
1303 };
1304 let component_label = match &kind {
1305 NodeKind::Exec { .. } => "component.exec".to_string(),
1306 NodeKind::PackComponent { component_ref } => component_ref.clone(),
1307 NodeKind::ProviderInvoke => "provider.invoke".to_string(),
1308 NodeKind::FlowCall => "flow.call".to_string(),
1309 NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
1310 NodeKind::Wait => "session.wait".to_string(),
1311 };
1312 let operation_name = if is_component_exec && operation_is_component_exec {
1313 None
1314 } else {
1315 raw_operation.clone()
1316 };
1317 let payload_expr = match kind {
1318 NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.input.mapping),
1319 _ => node.input.mapping.clone(),
1320 };
1321 Self {
1322 kind,
1323 component: component_label,
1324 component_id: if is_component_exec {
1325 "component.exec".to_string()
1326 } else {
1327 component_ref
1328 },
1329 operation_name,
1330 operation_in_mapping,
1331 payload_expr,
1332 routing: node.routing,
1333 }
1334 }
1335}
1336
1337fn extract_target_component(payload: &Value) -> Option<String> {
1338 match payload {
1339 Value::Object(map) => map
1340 .get("component")
1341 .or_else(|| map.get("component_ref"))
1342 .and_then(Value::as_str)
1343 .map(|s| s.to_string()),
1344 _ => None,
1345 }
1346}
1347
1348fn extract_operation_from_mapping(payload: &Value) -> Option<String> {
1349 match payload {
1350 Value::Object(map) => map
1351 .get("operation")
1352 .or_else(|| map.get("op"))
1353 .and_then(Value::as_str)
1354 .map(str::trim)
1355 .filter(|value| !value.is_empty())
1356 .map(|value| value.to_string()),
1357 _ => None,
1358 }
1359}
1360
1361fn extract_emit_payload(payload: &Value) -> Value {
1362 if let Value::Object(map) = payload {
1363 if let Some(input) = map.get("input") {
1364 return input.clone();
1365 }
1366 if let Some(inner) = map.get("payload") {
1367 return inner.clone();
1368 }
1369 }
1370 payload.clone()
1371}
1372
1373fn split_operation_payload(payload: Value) -> (Value, Value) {
1374 if let Value::Object(mut map) = payload.clone()
1375 && map.contains_key("input")
1376 {
1377 let input = map.remove("input").unwrap_or(Value::Null);
1378 let config = map.remove("config").unwrap_or(Value::Null);
1379 let legacy_only = map.keys().all(|key| {
1380 matches!(
1381 key.as_str(),
1382 "operation" | "op" | "component" | "component_ref"
1383 )
1384 });
1385 if legacy_only {
1386 return (input, config);
1387 }
1388 }
1389 (payload, Value::Null)
1390}
1391
1392fn resolve_component_operation(
1393 node_id: &str,
1394 component_label: &str,
1395 payload_operation: Option<String>,
1396 operation_override: Option<&str>,
1397 operation_in_mapping: Option<&str>,
1398) -> Result<String> {
1399 if let Some(op) = operation_override
1400 .map(str::trim)
1401 .filter(|value| !value.is_empty())
1402 {
1403 return Ok(op.to_string());
1404 }
1405
1406 if let Some(op) = payload_operation
1407 .as_deref()
1408 .map(str::trim)
1409 .filter(|value| !value.is_empty())
1410 {
1411 return Ok(op.to_string());
1412 }
1413
1414 let mut message = format!(
1415 "missing operation for node `{}` (component `{}`); expected node.component.operation to be set",
1416 node_id, component_label,
1417 );
1418 if let Some(found) = operation_in_mapping {
1419 message.push_str(&format!(
1420 ". Found operation in input.mapping (`{}`) but this is not used; pack compiler must preserve node.component.operation.",
1421 found
1422 ));
1423 }
1424 bail!(message);
1425}
1426
1427fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
1428 match component_ref {
1429 "emit.log" => EmitKind::Log,
1430 "emit.response" => EmitKind::Response,
1431 other => EmitKind::Other(other.to_string()),
1432 }
1433}
1434
1435fn emit_ref_from_kind(kind: &EmitKind) -> String {
1436 match kind {
1437 EmitKind::Log => "emit.log".to_string(),
1438 EmitKind::Response => "emit.response".to_string(),
1439 EmitKind::Other(other) => other.clone(),
1440 }
1441}
1442
1443#[cfg(test)]
1444mod tests {
1445 use super::*;
1446 use crate::validate::{ValidationConfig, ValidationMode};
1447 use greentic_types::{
1448 Flow, FlowComponentRef, FlowId, FlowKind, InputMapping, Node, NodeId, OutputMapping,
1449 Routing, TelemetryHints,
1450 };
1451 use serde_json::json;
1452 use std::collections::BTreeMap;
1453 use std::str::FromStr;
1454 use std::sync::Mutex;
1455 use tokio::runtime::Runtime;
1456
1457 fn minimal_engine() -> FlowEngine {
1458 FlowEngine {
1459 packs: Vec::new(),
1460 flows: Vec::new(),
1461 flow_sources: HashMap::new(),
1462 flow_cache: RwLock::new(HashMap::new()),
1463 default_env: "local".to_string(),
1464 validation: ValidationConfig {
1465 mode: ValidationMode::Off,
1466 },
1467 }
1468 }
1469
1470 #[test]
1471 fn templating_renders_with_partials_and_data() {
1472 let mut state = ExecutionState::new(json!({ "city": "London" }));
1473 state.nodes.insert(
1474 "forecast".to_string(),
1475 NodeOutput::new(json!({ "temp": "20C" })),
1476 );
1477
1478 let ctx = state.context();
1480 assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
1481 }
1482
1483 #[test]
1484 fn finalize_wraps_emitted_payloads() {
1485 let mut state = ExecutionState::new(json!({}));
1486 state.push_egress(json!({ "text": "first" }));
1487 state.push_egress(json!({ "text": "second" }));
1488 let result = state.finalize_with(Some(json!({ "text": "final" })));
1489 assert_eq!(
1490 result,
1491 json!([
1492 { "text": "first" },
1493 { "text": "second" },
1494 { "text": "final" }
1495 ])
1496 );
1497 }
1498
1499 #[test]
1500 fn finalize_flattens_final_array() {
1501 let mut state = ExecutionState::new(json!({}));
1502 state.push_egress(json!({ "text": "only" }));
1503 let result = state.finalize_with(Some(json!([
1504 { "text": "extra-1" },
1505 { "text": "extra-2" }
1506 ])));
1507 assert_eq!(
1508 result,
1509 json!([
1510 { "text": "only" },
1511 { "text": "extra-1" },
1512 { "text": "extra-2" }
1513 ])
1514 );
1515 }
1516
1517 #[test]
1518 fn missing_operation_reports_node_and_component() {
1519 let engine = minimal_engine();
1520 let rt = Runtime::new().unwrap();
1521 let retry_config = RetryConfig {
1522 max_attempts: 1,
1523 base_delay_ms: 1,
1524 };
1525 let ctx = FlowContext {
1526 tenant: "tenant",
1527 pack_id: "test-pack",
1528 flow_id: "flow",
1529 node_id: Some("missing-op"),
1530 tool: None,
1531 action: None,
1532 session_id: None,
1533 provider_id: None,
1534 retry_config,
1535 attempt: 1,
1536 observer: None,
1537 mocks: None,
1538 };
1539 let node = HostNode {
1540 kind: NodeKind::Exec {
1541 target_component: "qa.process".into(),
1542 },
1543 component: "component.exec".into(),
1544 component_id: "component.exec".into(),
1545 operation_name: None,
1546 operation_in_mapping: None,
1547 payload_expr: Value::Null,
1548 routing: Routing::End,
1549 };
1550 let _state = ExecutionState::new(Value::Null);
1551 let payload = json!({ "component": "qa.process" });
1552 let event = NodeEvent {
1553 context: &ctx,
1554 node_id: "missing-op",
1555 node: &node,
1556 payload: &payload,
1557 };
1558 let err = rt
1559 .block_on(engine.execute_component_exec(
1560 &ctx,
1561 "missing-op",
1562 &node,
1563 payload.clone(),
1564 &event,
1565 ComponentOverrides {
1566 component: None,
1567 operation: None,
1568 },
1569 ))
1570 .unwrap_err();
1571 let message = err.to_string();
1572 assert!(
1573 message.contains("missing operation for node `missing-op`"),
1574 "unexpected message: {message}"
1575 );
1576 assert!(
1577 message.contains("(component `component.exec`)"),
1578 "unexpected message: {message}"
1579 );
1580 }
1581
1582 #[test]
1583 fn missing_operation_mentions_mapping_hint() {
1584 let engine = minimal_engine();
1585 let rt = Runtime::new().unwrap();
1586 let retry_config = RetryConfig {
1587 max_attempts: 1,
1588 base_delay_ms: 1,
1589 };
1590 let ctx = FlowContext {
1591 tenant: "tenant",
1592 pack_id: "test-pack",
1593 flow_id: "flow",
1594 node_id: Some("missing-op-hint"),
1595 tool: None,
1596 action: None,
1597 session_id: None,
1598 provider_id: None,
1599 retry_config,
1600 attempt: 1,
1601 observer: None,
1602 mocks: None,
1603 };
1604 let node = HostNode {
1605 kind: NodeKind::Exec {
1606 target_component: "qa.process".into(),
1607 },
1608 component: "component.exec".into(),
1609 component_id: "component.exec".into(),
1610 operation_name: None,
1611 operation_in_mapping: Some("render".into()),
1612 payload_expr: Value::Null,
1613 routing: Routing::End,
1614 };
1615 let _state = ExecutionState::new(Value::Null);
1616 let payload = json!({ "component": "qa.process" });
1617 let event = NodeEvent {
1618 context: &ctx,
1619 node_id: "missing-op-hint",
1620 node: &node,
1621 payload: &payload,
1622 };
1623 let err = rt
1624 .block_on(engine.execute_component_exec(
1625 &ctx,
1626 "missing-op-hint",
1627 &node,
1628 payload.clone(),
1629 &event,
1630 ComponentOverrides {
1631 component: None,
1632 operation: None,
1633 },
1634 ))
1635 .unwrap_err();
1636 let message = err.to_string();
1637 assert!(
1638 message.contains("missing operation for node `missing-op-hint`"),
1639 "unexpected message: {message}"
1640 );
1641 assert!(
1642 message.contains("Found operation in input.mapping (`render`)"),
1643 "unexpected message: {message}"
1644 );
1645 }
1646
1647 struct CountingObserver {
1648 starts: Mutex<Vec<String>>,
1649 ends: Mutex<Vec<Value>>,
1650 }
1651
1652 impl CountingObserver {
1653 fn new() -> Self {
1654 Self {
1655 starts: Mutex::new(Vec::new()),
1656 ends: Mutex::new(Vec::new()),
1657 }
1658 }
1659 }
1660
1661 impl ExecutionObserver for CountingObserver {
1662 fn on_node_start(&self, event: &NodeEvent<'_>) {
1663 self.starts.lock().unwrap().push(event.node_id.to_string());
1664 }
1665
1666 fn on_node_end(&self, _event: &NodeEvent<'_>, output: &Value) {
1667 self.ends.lock().unwrap().push(output.clone());
1668 }
1669
1670 fn on_node_error(&self, _event: &NodeEvent<'_>, _error: &dyn StdError) {}
1671 }
1672
1673 #[test]
1674 fn emits_end_event_for_successful_node() {
1675 let node_id = NodeId::from_str("emit").unwrap();
1676 let node = Node {
1677 id: node_id.clone(),
1678 component: FlowComponentRef {
1679 id: "emit.log".parse().unwrap(),
1680 pack_alias: None,
1681 operation: None,
1682 },
1683 input: InputMapping {
1684 mapping: json!({ "message": "logged" }),
1685 },
1686 output: OutputMapping {
1687 mapping: Value::Null,
1688 },
1689 routing: Routing::End,
1690 telemetry: TelemetryHints::default(),
1691 };
1692 let mut nodes = indexmap::IndexMap::default();
1693 nodes.insert(node_id.clone(), node);
1694 let flow = Flow {
1695 schema_version: "1.0".into(),
1696 id: FlowId::from_str("emit.flow").unwrap(),
1697 kind: FlowKind::Messaging,
1698 entrypoints: BTreeMap::from([(
1699 "default".to_string(),
1700 Value::String(node_id.to_string()),
1701 )]),
1702 nodes,
1703 metadata: Default::default(),
1704 };
1705 let host_flow = HostFlow::from(flow);
1706
1707 let engine = FlowEngine {
1708 packs: Vec::new(),
1709 flows: Vec::new(),
1710 flow_sources: HashMap::new(),
1711 flow_cache: RwLock::new(HashMap::from([(
1712 FlowKey {
1713 pack_id: "test-pack".to_string(),
1714 flow_id: "emit.flow".to_string(),
1715 },
1716 host_flow,
1717 )])),
1718 default_env: "local".to_string(),
1719 validation: ValidationConfig {
1720 mode: ValidationMode::Off,
1721 },
1722 };
1723 let observer = CountingObserver::new();
1724 let ctx = FlowContext {
1725 tenant: "demo",
1726 pack_id: "test-pack",
1727 flow_id: "emit.flow",
1728 node_id: None,
1729 tool: None,
1730 action: None,
1731 session_id: None,
1732 provider_id: None,
1733 retry_config: RetryConfig {
1734 max_attempts: 1,
1735 base_delay_ms: 1,
1736 },
1737 attempt: 1,
1738 observer: Some(&observer),
1739 mocks: None,
1740 };
1741
1742 let rt = Runtime::new().unwrap();
1743 let result = rt.block_on(engine.execute(ctx, Value::Null)).unwrap();
1744 assert!(matches!(result.status, FlowStatus::Completed));
1745
1746 let starts = observer.starts.lock().unwrap();
1747 let ends = observer.ends.lock().unwrap();
1748 assert_eq!(starts.len(), 1);
1749 assert_eq!(ends.len(), 1);
1750 assert_eq!(ends[0], json!({ "message": "logged" }));
1751 }
1752}
1753
1754use tracing::Instrument;
1755
1756pub struct FlowContext<'a> {
1757 pub tenant: &'a str,
1758 pub pack_id: &'a str,
1759 pub flow_id: &'a str,
1760 pub node_id: Option<&'a str>,
1761 pub tool: Option<&'a str>,
1762 pub action: Option<&'a str>,
1763 pub session_id: Option<&'a str>,
1764 pub provider_id: Option<&'a str>,
1765 pub retry_config: RetryConfig,
1766 pub attempt: u32,
1767 pub observer: Option<&'a dyn ExecutionObserver>,
1768 pub mocks: Option<&'a MockLayer>,
1769}
1770
1771#[derive(Copy, Clone)]
1772pub struct RetryConfig {
1773 pub max_attempts: u32,
1774 pub base_delay_ms: u64,
1775}
1776
1777fn should_retry(err: &anyhow::Error) -> bool {
1778 let lower = err.to_string().to_lowercase();
1779 lower.contains("transient")
1780 || lower.contains("unavailable")
1781 || lower.contains("internal")
1782 || lower.contains("timeout")
1783}
1784
1785impl From<FlowRetryConfig> for RetryConfig {
1786 fn from(value: FlowRetryConfig) -> Self {
1787 Self {
1788 max_attempts: value.max_attempts.max(1),
1789 base_delay_ms: value.base_delay_ms.max(50),
1790 }
1791 }
1792}