1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
9use anyhow::{Context, Result, anyhow, bail};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use super::templating::{TemplateOptions, render_template_value};
18use crate::config::{FlowRetryConfig, HostConfig};
19use crate::pack::{FlowDescriptor, PackRuntime};
20use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
21#[cfg(feature = "fault-injection")]
22use crate::testing::fault_injection::{FaultContext, FaultPoint, maybe_fail};
23use crate::validate::{
24 ValidationConfig, ValidationIssue, ValidationMode, validate_component_envelope,
25 validate_tool_envelope,
26};
27use greentic_types::{Flow, Node, NodeId, Routing};
28
29pub struct FlowEngine {
30 packs: Vec<Arc<PackRuntime>>,
31 flows: Vec<FlowDescriptor>,
32 flow_sources: HashMap<FlowKey, usize>,
33 flow_cache: RwLock<HashMap<FlowKey, HostFlow>>,
34 default_env: String,
35 validation: ValidationConfig,
36}
37
38#[derive(Clone, Debug, PartialEq, Eq, Hash)]
39struct FlowKey {
40 pack_id: String,
41 flow_id: String,
42}
43
44#[derive(Clone, Debug, Serialize, Deserialize)]
45pub struct FlowSnapshot {
46 pub pack_id: String,
47 pub flow_id: String,
48 pub next_node: String,
49 pub state: ExecutionState,
50}
51
52#[derive(Clone, Debug)]
53pub struct FlowWait {
54 pub reason: Option<String>,
55 pub snapshot: FlowSnapshot,
56}
57
58#[derive(Clone, Debug)]
59pub enum FlowStatus {
60 Completed,
61 Waiting(Box<FlowWait>),
62}
63
64#[derive(Clone, Debug)]
65pub struct FlowExecution {
66 pub output: Value,
67 pub status: FlowStatus,
68}
69
70#[derive(Clone, Debug)]
71struct HostFlow {
72 id: String,
73 start: Option<NodeId>,
74 nodes: IndexMap<NodeId, HostNode>,
75}
76
77#[derive(Clone, Debug)]
78pub struct HostNode {
79 kind: NodeKind,
80 pub component: String,
82 component_id: String,
83 operation_name: Option<String>,
84 operation_in_mapping: Option<String>,
85 payload_expr: Value,
86 routing: Routing,
87}
88
89impl HostNode {
90 pub fn component_id(&self) -> &str {
91 &self.component_id
92 }
93
94 pub fn operation_name(&self) -> Option<&str> {
95 self.operation_name.as_deref()
96 }
97
98 pub fn operation_in_mapping(&self) -> Option<&str> {
99 self.operation_in_mapping.as_deref()
100 }
101}
102
103#[derive(Clone, Debug)]
104enum NodeKind {
105 Exec { target_component: String },
106 PackComponent { component_ref: String },
107 ProviderInvoke,
108 FlowCall,
109 BuiltinEmit { kind: EmitKind },
110 Wait,
111}
112
113#[derive(Clone, Debug)]
114enum EmitKind {
115 Log,
116 Response,
117 Other(String),
118}
119
120struct ComponentOverrides<'a> {
121 component: Option<&'a str>,
122 operation: Option<&'a str>,
123}
124
125struct ComponentCall {
126 component_ref: String,
127 operation: String,
128 input: Value,
129 config: Value,
130}
131
132impl FlowExecution {
133 fn completed(output: Value) -> Self {
134 Self {
135 output,
136 status: FlowStatus::Completed,
137 }
138 }
139
140 fn waiting(output: Value, wait: FlowWait) -> Self {
141 Self {
142 output,
143 status: FlowStatus::Waiting(Box::new(wait)),
144 }
145 }
146}
147
148impl FlowEngine {
149 pub async fn new(packs: Vec<Arc<PackRuntime>>, config: Arc<HostConfig>) -> Result<Self> {
150 let mut flow_sources: HashMap<FlowKey, usize> = HashMap::new();
151 let mut descriptors = Vec::new();
152 let mut bindings = HashMap::new();
153 for pack in &config.pack_bindings {
154 bindings.insert(pack.pack_id.clone(), pack.flows.clone());
155 }
156 let enforce_bindings = !bindings.is_empty();
157 for (idx, pack) in packs.iter().enumerate() {
158 let pack_id = pack.metadata().pack_id.clone();
159 if enforce_bindings && !bindings.contains_key(&pack_id) {
160 bail!("no gtbind entries found for pack {}", pack_id);
161 }
162 let flows = pack.list_flows().await?;
163 let allowed = bindings.get(&pack_id).map(|flows| {
164 flows
165 .iter()
166 .cloned()
167 .collect::<std::collections::HashSet<_>>()
168 });
169 let mut seen = std::collections::HashSet::new();
170 for flow in flows {
171 if let Some(ref allow) = allowed
172 && !allow.contains(&flow.id)
173 {
174 continue;
175 }
176 seen.insert(flow.id.clone());
177 tracing::info!(
178 flow_id = %flow.id,
179 flow_type = %flow.flow_type,
180 pack_id = %flow.pack_id,
181 pack_index = idx,
182 "registered flow"
183 );
184 flow_sources.insert(
185 FlowKey {
186 pack_id: flow.pack_id.clone(),
187 flow_id: flow.id.clone(),
188 },
189 idx,
190 );
191 descriptors.retain(|existing: &FlowDescriptor| {
192 !(existing.id == flow.id && existing.pack_id == flow.pack_id)
193 });
194 descriptors.push(flow);
195 }
196 if let Some(allow) = allowed {
197 let missing = allow.difference(&seen).cloned().collect::<Vec<_>>();
198 if !missing.is_empty() {
199 bail!(
200 "gtbind flow ids missing in pack {}: {}",
201 pack_id,
202 missing.join(", ")
203 );
204 }
205 }
206 }
207
208 let mut flow_map = HashMap::new();
209 for flow in &descriptors {
210 let pack_id = flow.pack_id.clone();
211 if let Some(&pack_idx) = flow_sources.get(&FlowKey {
212 pack_id: pack_id.clone(),
213 flow_id: flow.id.clone(),
214 }) {
215 let pack_clone = Arc::clone(&packs[pack_idx]);
216 let flow_id = flow.id.clone();
217 let task_flow_id = flow_id.clone();
218 match task::spawn_blocking(move || pack_clone.load_flow(&task_flow_id)).await {
219 Ok(Ok(loaded_flow)) => {
220 flow_map.insert(
221 FlowKey {
222 pack_id: pack_id.clone(),
223 flow_id,
224 },
225 HostFlow::from(loaded_flow),
226 );
227 }
228 Ok(Err(err)) => {
229 tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
230 }
231 Err(err) => {
232 tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
233 }
234 }
235 }
236 }
237
238 Ok(Self {
239 packs,
240 flows: descriptors,
241 flow_sources,
242 flow_cache: RwLock::new(flow_map),
243 default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
244 validation: config.validation.clone(),
245 })
246 }
247
248 async fn get_or_load_flow(&self, pack_id: &str, flow_id: &str) -> Result<HostFlow> {
249 let key = FlowKey {
250 pack_id: pack_id.to_string(),
251 flow_id: flow_id.to_string(),
252 };
253 if let Some(flow) = self.flow_cache.read().get(&key).cloned() {
254 return Ok(flow);
255 }
256
257 let pack_idx = *self
258 .flow_sources
259 .get(&key)
260 .with_context(|| format!("flow {pack_id}:{flow_id} not registered"))?;
261 let pack = Arc::clone(&self.packs[pack_idx]);
262 let flow_id_owned = flow_id.to_string();
263 let task_flow_id = flow_id_owned.clone();
264 let flow = task::spawn_blocking(move || pack.load_flow(&task_flow_id))
265 .await
266 .context("failed to join flow metadata task")??;
267 let host_flow = HostFlow::from(flow);
268 self.flow_cache.write().insert(
269 FlowKey {
270 pack_id: pack_id.to_string(),
271 flow_id: flow_id_owned.clone(),
272 },
273 host_flow.clone(),
274 );
275 Ok(host_flow)
276 }
277
278 pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
279 let span = tracing::info_span!(
280 "flow.execute",
281 tenant = tracing::field::Empty,
282 flow_id = tracing::field::Empty,
283 node_id = tracing::field::Empty,
284 tool = tracing::field::Empty,
285 action = tracing::field::Empty
286 );
287 annotate_span(
288 &span,
289 &FlowSpanAttributes {
290 tenant: ctx.tenant,
291 flow_id: ctx.flow_id,
292 node_id: ctx.node_id,
293 tool: ctx.tool,
294 action: ctx.action,
295 },
296 );
297 set_flow_context(
298 &self.default_env,
299 ctx.tenant,
300 ctx.flow_id,
301 ctx.node_id,
302 ctx.provider_id,
303 ctx.session_id,
304 );
305 let retry_config = ctx.retry_config;
306 let original_input = input;
307 let mut ctx = ctx;
308 async move {
309 let mut attempt = 0u32;
310 loop {
311 attempt += 1;
312 ctx.attempt = attempt;
313 #[cfg(feature = "fault-injection")]
314 {
315 let fault_ctx = FaultContext {
316 pack_id: ctx.pack_id,
317 flow_id: ctx.flow_id,
318 node_id: ctx.node_id,
319 attempt: ctx.attempt,
320 };
321 maybe_fail(FaultPoint::Timeout, fault_ctx)
322 .map_err(|err| anyhow!(err.to_string()))?;
323 }
324 match self.execute_once(&ctx, original_input.clone()).await {
325 Ok(value) => return Ok(value),
326 Err(err) => {
327 if attempt >= retry_config.max_attempts || !should_retry(&err) {
328 return Err(err);
329 }
330 let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
331 tracing::warn!(
332 tenant = ctx.tenant,
333 flow_id = ctx.flow_id,
334 attempt,
335 max_attempts = retry_config.max_attempts,
336 delay_ms = delay,
337 error = %err,
338 "transient flow execution failure, backing off"
339 );
340 tokio::time::sleep(Duration::from_millis(delay)).await;
341 }
342 }
343 }
344 }
345 .instrument(span)
346 .await
347 }
348
349 pub async fn resume(
350 &self,
351 ctx: FlowContext<'_>,
352 snapshot: FlowSnapshot,
353 input: Value,
354 ) -> Result<FlowExecution> {
355 if snapshot.pack_id != ctx.pack_id {
356 bail!(
357 "snapshot pack {} does not match requested {}",
358 snapshot.pack_id,
359 ctx.pack_id
360 );
361 }
362 if snapshot.flow_id != ctx.flow_id {
363 bail!(
364 "snapshot flow {} does not match requested {}",
365 snapshot.flow_id,
366 ctx.flow_id
367 );
368 }
369 let flow_ir = self.get_or_load_flow(ctx.pack_id, ctx.flow_id).await?;
370 let mut state = snapshot.state;
371 state.replace_input(input);
372 state.ensure_entry();
373 self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
374 .await
375 }
376
377 async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
378 let flow_ir = self.get_or_load_flow(ctx.pack_id, ctx.flow_id).await?;
379 let state = ExecutionState::new(input);
380 self.drive_flow(ctx, flow_ir, state, None).await
381 }
382
383 async fn drive_flow(
384 &self,
385 ctx: &FlowContext<'_>,
386 flow_ir: HostFlow,
387 mut state: ExecutionState,
388 resume_from: Option<String>,
389 ) -> Result<FlowExecution> {
390 let mut current = match resume_from {
391 Some(node) => NodeId::from_str(&node)
392 .with_context(|| format!("invalid resume node id `{node}`"))?,
393 None => flow_ir
394 .start
395 .clone()
396 .or_else(|| flow_ir.nodes.keys().next().cloned())
397 .with_context(|| format!("flow {} has no start node", flow_ir.id))?,
398 };
399
400 loop {
401 let node = flow_ir
402 .nodes
403 .get(¤t)
404 .with_context(|| format!("node {} not found", current.as_str()))?;
405
406 let payload_template = node.payload_expr.clone();
407 let prev = state
408 .last_output
409 .as_ref()
410 .cloned()
411 .unwrap_or_else(|| Value::Object(JsonMap::new()));
412 let ctx_value = template_context(&state, prev);
413 #[cfg(feature = "fault-injection")]
414 {
415 let fault_ctx = FaultContext {
416 pack_id: ctx.pack_id,
417 flow_id: ctx.flow_id,
418 node_id: Some(current.as_str()),
419 attempt: ctx.attempt,
420 };
421 maybe_fail(FaultPoint::TemplateRender, fault_ctx)
422 .map_err(|err| anyhow!(err.to_string()))?;
423 }
424 let payload =
425 render_template_value(&payload_template, &ctx_value, TemplateOptions::default())
426 .context("failed to render node input template")?;
427 let observed_payload = payload.clone();
428 let node_id = current.clone();
429 let event = NodeEvent {
430 context: ctx,
431 node_id: node_id.as_str(),
432 node,
433 payload: &observed_payload,
434 };
435 if let Some(observer) = ctx.observer {
436 observer.on_node_start(&event);
437 }
438 let dispatch = self
439 .dispatch_node(ctx, node_id.as_str(), node, &mut state, payload, &event)
440 .await;
441 let DispatchOutcome {
442 output,
443 wait_reason,
444 } = match dispatch {
445 Ok(outcome) => outcome,
446 Err(err) => {
447 if let Some(observer) = ctx.observer {
448 observer.on_node_error(&event, err.as_ref());
449 }
450 return Err(err);
451 }
452 };
453
454 state.nodes.insert(node_id.clone().into(), output.clone());
455 state.last_output = Some(output.payload.clone());
456 if let Some(observer) = ctx.observer {
457 observer.on_node_end(&event, &output.payload);
458 }
459
460 let (next, should_exit) = match &node.routing {
461 Routing::Next { node_id } => (Some(node_id.clone()), false),
462 Routing::End | Routing::Reply => (None, true),
463 Routing::Branch { default, .. } => (default.clone(), default.is_none()),
464 Routing::Custom(raw) => {
465 tracing::warn!(
466 flow_id = %flow_ir.id,
467 node_id = %node_id,
468 routing = ?raw,
469 "unsupported routing; terminating flow"
470 );
471 (None, true)
472 }
473 };
474
475 if let Some(wait_reason) = wait_reason {
476 let resume_target = next.clone().ok_or_else(|| {
477 anyhow!(
478 "session.wait node {} requires a non-empty route",
479 current.as_str()
480 )
481 })?;
482 let mut snapshot_state = state.clone();
483 snapshot_state.clear_egress();
484 let snapshot = FlowSnapshot {
485 pack_id: ctx.pack_id.to_string(),
486 flow_id: ctx.flow_id.to_string(),
487 next_node: resume_target.as_str().to_string(),
488 state: snapshot_state,
489 };
490 let output_value = state.clone().finalize_with(None);
491 return Ok(FlowExecution::waiting(
492 output_value,
493 FlowWait {
494 reason: Some(wait_reason),
495 snapshot,
496 },
497 ));
498 }
499
500 if should_exit {
501 return Ok(FlowExecution::completed(
502 state.finalize_with(Some(output.payload.clone())),
503 ));
504 }
505
506 match next {
507 Some(n) => current = n,
508 None => {
509 return Ok(FlowExecution::completed(
510 state.finalize_with(Some(output.payload.clone())),
511 ));
512 }
513 }
514 }
515 }
516
517 async fn dispatch_node(
518 &self,
519 ctx: &FlowContext<'_>,
520 node_id: &str,
521 node: &HostNode,
522 state: &mut ExecutionState,
523 payload: Value,
524 event: &NodeEvent<'_>,
525 ) -> Result<DispatchOutcome> {
526 match &node.kind {
527 NodeKind::Exec { target_component } => self
528 .execute_component_exec(
529 ctx,
530 node_id,
531 node,
532 payload,
533 event,
534 ComponentOverrides {
535 component: Some(target_component.as_str()),
536 operation: node.operation_name.as_deref(),
537 },
538 )
539 .await
540 .map(DispatchOutcome::complete),
541 NodeKind::PackComponent { component_ref } => self
542 .execute_component_call(ctx, node_id, node, payload, component_ref.as_str(), event)
543 .await
544 .map(DispatchOutcome::complete),
545 NodeKind::FlowCall => self
546 .execute_flow_call(ctx, payload)
547 .await
548 .map(DispatchOutcome::complete),
549 NodeKind::ProviderInvoke => self
550 .execute_provider_invoke(ctx, node_id, state, payload, event)
551 .await
552 .map(DispatchOutcome::complete),
553 NodeKind::BuiltinEmit { kind } => {
554 match kind {
555 EmitKind::Log | EmitKind::Response => {}
556 EmitKind::Other(component) => {
557 tracing::debug!(%component, "handling emit.* as builtin");
558 }
559 }
560 state.push_egress(payload.clone());
561 Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
562 }
563 NodeKind::Wait => {
564 let reason = extract_wait_reason(&payload);
565 Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
566 }
567 }
568 }
569
570 async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
571 #[derive(Deserialize)]
572 struct FlowCallPayload {
573 #[serde(alias = "flow")]
574 flow_id: String,
575 #[serde(default)]
576 input: Value,
577 }
578
579 let call: FlowCallPayload =
580 serde_json::from_value(payload).context("invalid payload for flow.call node")?;
581 if call.flow_id.trim().is_empty() {
582 bail!("flow.call requires a non-empty flow_id");
583 }
584
585 let sub_input = if call.input.is_null() {
586 Value::Null
587 } else {
588 call.input
589 };
590
591 let flow_id_owned = call.flow_id;
592 let action = "flow.call";
593 let sub_ctx = FlowContext {
594 tenant: ctx.tenant,
595 pack_id: ctx.pack_id,
596 flow_id: flow_id_owned.as_str(),
597 node_id: None,
598 tool: ctx.tool,
599 action: Some(action),
600 session_id: ctx.session_id,
601 provider_id: ctx.provider_id,
602 retry_config: ctx.retry_config,
603 attempt: ctx.attempt,
604 observer: ctx.observer,
605 mocks: ctx.mocks,
606 };
607
608 let execution = Box::pin(self.execute(sub_ctx, sub_input))
609 .await
610 .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
611 match execution.status {
612 FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
613 FlowStatus::Waiting(wait) => bail!(
614 "flow.call cannot pause (flow {} waiting {:?})",
615 flow_id_owned,
616 wait.reason
617 ),
618 }
619 }
620
621 async fn execute_component_exec(
622 &self,
623 ctx: &FlowContext<'_>,
624 node_id: &str,
625 node: &HostNode,
626 payload: Value,
627 event: &NodeEvent<'_>,
628 overrides: ComponentOverrides<'_>,
629 ) -> Result<NodeOutput> {
630 #[derive(Deserialize)]
631 struct ComponentPayload {
632 #[serde(default, alias = "component_ref", alias = "component")]
633 component: Option<String>,
634 #[serde(alias = "op")]
635 operation: Option<String>,
636 #[serde(default)]
637 input: Value,
638 #[serde(default)]
639 config: Value,
640 }
641
642 let payload: ComponentPayload =
643 serde_json::from_value(payload).context("invalid payload for component.exec")?;
644 let component_ref = overrides
645 .component
646 .map(str::to_string)
647 .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
648 .with_context(|| "component.exec requires a component_ref")?;
649 let operation = resolve_component_operation(
650 node_id,
651 node.component_id.as_str(),
652 payload.operation,
653 overrides.operation,
654 node.operation_in_mapping.as_deref(),
655 )?;
656 let call = ComponentCall {
657 component_ref,
658 operation,
659 input: payload.input,
660 config: payload.config,
661 };
662
663 self.invoke_component_call(ctx, node_id, call, event).await
664 }
665
666 async fn execute_component_call(
667 &self,
668 ctx: &FlowContext<'_>,
669 node_id: &str,
670 node: &HostNode,
671 payload: Value,
672 component_ref: &str,
673 event: &NodeEvent<'_>,
674 ) -> Result<NodeOutput> {
675 let payload_operation = extract_operation_from_mapping(&payload);
676 let (input, config) = split_operation_payload(payload);
677 let operation = resolve_component_operation(
678 node_id,
679 node.component_id.as_str(),
680 payload_operation,
681 node.operation_name.as_deref(),
682 node.operation_in_mapping.as_deref(),
683 )?;
684 let call = ComponentCall {
685 component_ref: component_ref.to_string(),
686 operation,
687 input,
688 config,
689 };
690 self.invoke_component_call(ctx, node_id, call, event).await
691 }
692
693 async fn invoke_component_call(
694 &self,
695 ctx: &FlowContext<'_>,
696 node_id: &str,
697 call: ComponentCall,
698 event: &NodeEvent<'_>,
699 ) -> Result<NodeOutput> {
700 self.validate_component(ctx, event, &call)?;
701 let input_json = serde_json::to_string(&call.input)?;
702 let config_json = if call.config.is_null() {
703 None
704 } else {
705 Some(serde_json::to_string(&call.config)?)
706 };
707
708 let key = FlowKey {
709 pack_id: ctx.pack_id.to_string(),
710 flow_id: ctx.flow_id.to_string(),
711 };
712 let pack_idx = *self.flow_sources.get(&key).with_context(|| {
713 format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
714 })?;
715 let pack = Arc::clone(&self.packs[pack_idx]);
716 let exec_ctx = component_exec_ctx(ctx, node_id);
717 #[cfg(feature = "fault-injection")]
718 {
719 let fault_ctx = FaultContext {
720 pack_id: ctx.pack_id,
721 flow_id: ctx.flow_id,
722 node_id: Some(node_id),
723 attempt: ctx.attempt,
724 };
725 maybe_fail(FaultPoint::BeforeComponentCall, fault_ctx)
726 .map_err(|err| anyhow!(err.to_string()))?;
727 }
728 let value = pack
729 .invoke_component(
730 call.component_ref.as_str(),
731 exec_ctx,
732 call.operation.as_str(),
733 config_json,
734 input_json,
735 )
736 .await?;
737 #[cfg(feature = "fault-injection")]
738 {
739 let fault_ctx = FaultContext {
740 pack_id: ctx.pack_id,
741 flow_id: ctx.flow_id,
742 node_id: Some(node_id),
743 attempt: ctx.attempt,
744 };
745 maybe_fail(FaultPoint::AfterComponentCall, fault_ctx)
746 .map_err(|err| anyhow!(err.to_string()))?;
747 }
748
749 if let Some((code, message)) = component_error(&value) {
750 bail!(
751 "component {} failed: {}: {}",
752 call.component_ref,
753 code,
754 message
755 );
756 }
757 Ok(NodeOutput::new(value))
758 }
759
760 async fn execute_provider_invoke(
761 &self,
762 ctx: &FlowContext<'_>,
763 node_id: &str,
764 state: &ExecutionState,
765 payload: Value,
766 event: &NodeEvent<'_>,
767 ) -> Result<NodeOutput> {
768 #[derive(Deserialize)]
769 struct ProviderPayload {
770 #[serde(default)]
771 provider_id: Option<String>,
772 #[serde(default)]
773 provider_type: Option<String>,
774 #[serde(default, alias = "operation")]
775 op: Option<String>,
776 #[serde(default)]
777 input: Value,
778 #[serde(default)]
779 in_map: Value,
780 #[serde(default)]
781 out_map: Value,
782 #[serde(default)]
783 err_map: Value,
784 }
785
786 let payload: ProviderPayload =
787 serde_json::from_value(payload).context("invalid payload for provider.invoke")?;
788 let op = payload
789 .op
790 .as_deref()
791 .filter(|v| !v.trim().is_empty())
792 .with_context(|| "provider.invoke requires an op")?
793 .to_string();
794
795 let prev = state
796 .last_output
797 .as_ref()
798 .cloned()
799 .unwrap_or_else(|| Value::Object(JsonMap::new()));
800 let base_ctx = template_context(state, prev);
801
802 let input_value = if !payload.in_map.is_null() {
803 let mut ctx_value = base_ctx.clone();
804 if let Value::Object(ref mut map) = ctx_value {
805 map.insert("input".into(), payload.input.clone());
806 map.insert("result".into(), payload.input.clone());
807 }
808 render_template_value(
809 &payload.in_map,
810 &ctx_value,
811 TemplateOptions {
812 allow_pointer: true,
813 },
814 )
815 .context("failed to render provider.invoke in_map")?
816 } else if !payload.input.is_null() {
817 payload.input
818 } else {
819 Value::Null
820 };
821 let input_json = serde_json::to_vec(&input_value)?;
822
823 self.validate_tool(
824 ctx,
825 event,
826 payload.provider_id.as_deref(),
827 payload.provider_type.as_deref(),
828 &op,
829 &input_value,
830 )?;
831
832 let key = FlowKey {
833 pack_id: ctx.pack_id.to_string(),
834 flow_id: ctx.flow_id.to_string(),
835 };
836 let pack_idx = *self.flow_sources.get(&key).with_context(|| {
837 format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
838 })?;
839 let pack = Arc::clone(&self.packs[pack_idx]);
840 let binding = pack.resolve_provider(
841 payload.provider_id.as_deref(),
842 payload.provider_type.as_deref(),
843 )?;
844 let exec_ctx = component_exec_ctx(ctx, node_id);
845 #[cfg(feature = "fault-injection")]
846 {
847 let fault_ctx = FaultContext {
848 pack_id: ctx.pack_id,
849 flow_id: ctx.flow_id,
850 node_id: Some(node_id),
851 attempt: ctx.attempt,
852 };
853 maybe_fail(FaultPoint::BeforeToolCall, fault_ctx)
854 .map_err(|err| anyhow!(err.to_string()))?;
855 }
856 let result = pack
857 .invoke_provider(&binding, exec_ctx, &op, input_json)
858 .await?;
859 #[cfg(feature = "fault-injection")]
860 {
861 let fault_ctx = FaultContext {
862 pack_id: ctx.pack_id,
863 flow_id: ctx.flow_id,
864 node_id: Some(node_id),
865 attempt: ctx.attempt,
866 };
867 maybe_fail(FaultPoint::AfterToolCall, fault_ctx)
868 .map_err(|err| anyhow!(err.to_string()))?;
869 }
870
871 let output = if payload.out_map.is_null() {
872 result
873 } else {
874 let mut ctx_value = base_ctx;
875 if let Value::Object(ref mut map) = ctx_value {
876 map.insert("input".into(), result.clone());
877 map.insert("result".into(), result.clone());
878 }
879 render_template_value(
880 &payload.out_map,
881 &ctx_value,
882 TemplateOptions {
883 allow_pointer: true,
884 },
885 )
886 .context("failed to render provider.invoke out_map")?
887 };
888 let _ = payload.err_map;
889 Ok(NodeOutput::new(output))
890 }
891
892 fn validate_component(
893 &self,
894 ctx: &FlowContext<'_>,
895 event: &NodeEvent<'_>,
896 call: &ComponentCall,
897 ) -> Result<()> {
898 if self.validation.mode == ValidationMode::Off {
899 return Ok(());
900 }
901 let mut metadata = JsonMap::new();
902 metadata.insert("tenant_id".to_string(), json!(ctx.tenant));
903 if let Some(id) = ctx.session_id {
904 metadata.insert("session".to_string(), json!({ "id": id }));
905 }
906 let envelope = json!({
907 "component_id": call.component_ref,
908 "operation": call.operation,
909 "input": call.input,
910 "config": call.config,
911 "metadata": Value::Object(metadata),
912 });
913 let issues = validate_component_envelope(&envelope);
914 self.report_validation(ctx, event, "component", issues)
915 }
916
917 fn validate_tool(
918 &self,
919 ctx: &FlowContext<'_>,
920 event: &NodeEvent<'_>,
921 provider_id: Option<&str>,
922 provider_type: Option<&str>,
923 operation: &str,
924 input: &Value,
925 ) -> Result<()> {
926 if self.validation.mode == ValidationMode::Off {
927 return Ok(());
928 }
929 let tool_id = provider_id.or(provider_type).unwrap_or("provider.invoke");
930 let mut metadata = JsonMap::new();
931 metadata.insert("tenant_id".to_string(), json!(ctx.tenant));
932 if let Some(id) = ctx.session_id {
933 metadata.insert("session".to_string(), json!({ "id": id }));
934 }
935 let envelope = json!({
936 "tool_id": tool_id,
937 "operation": operation,
938 "input": input,
939 "metadata": Value::Object(metadata),
940 });
941 let issues = validate_tool_envelope(&envelope);
942 self.report_validation(ctx, event, "tool", issues)
943 }
944
945 fn report_validation(
946 &self,
947 ctx: &FlowContext<'_>,
948 event: &NodeEvent<'_>,
949 kind: &str,
950 issues: Vec<ValidationIssue>,
951 ) -> Result<()> {
952 if issues.is_empty() {
953 return Ok(());
954 }
955 if let Some(observer) = ctx.observer {
956 observer.on_validation(event, &issues);
957 }
958 match self.validation.mode {
959 ValidationMode::Warn => {
960 tracing::warn!(
961 tenant = ctx.tenant,
962 flow_id = ctx.flow_id,
963 node_id = event.node_id,
964 kind,
965 issues = ?issues,
966 "invocation envelope validation issues"
967 );
968 Ok(())
969 }
970 ValidationMode::Error => {
971 tracing::error!(
972 tenant = ctx.tenant,
973 flow_id = ctx.flow_id,
974 node_id = event.node_id,
975 kind,
976 issues = ?issues,
977 "invocation envelope validation failed"
978 );
979 bail!("invocation_validation_failed");
980 }
981 ValidationMode::Off => Ok(()),
982 }
983 }
984
985 pub fn flows(&self) -> &[FlowDescriptor] {
986 &self.flows
987 }
988
989 pub fn flow_by_key(&self, pack_id: &str, flow_id: &str) -> Option<&FlowDescriptor> {
990 self.flows
991 .iter()
992 .find(|descriptor| descriptor.pack_id == pack_id && descriptor.id == flow_id)
993 }
994
995 pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
996 let mut matches = self
997 .flows
998 .iter()
999 .filter(|descriptor| descriptor.flow_type == flow_type);
1000 let first = matches.next()?;
1001 if matches.next().is_some() {
1002 return None;
1003 }
1004 Some(first)
1005 }
1006
1007 pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
1008 let mut matches = self
1009 .flows
1010 .iter()
1011 .filter(|descriptor| descriptor.id == flow_id);
1012 let first = matches.next()?;
1013 if matches.next().is_some() {
1014 return None;
1015 }
1016 Some(first)
1017 }
1018}
1019
1020pub trait ExecutionObserver: Send + Sync {
1021 fn on_node_start(&self, event: &NodeEvent<'_>);
1022 fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
1023 fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
1024 fn on_validation(&self, _event: &NodeEvent<'_>, _issues: &[ValidationIssue]) {}
1025}
1026
1027pub struct NodeEvent<'a> {
1028 pub context: &'a FlowContext<'a>,
1029 pub node_id: &'a str,
1030 pub node: &'a HostNode,
1031 pub payload: &'a Value,
1032}
1033
1034#[derive(Clone, Debug, Serialize, Deserialize)]
1035pub struct ExecutionState {
1036 #[serde(default)]
1037 entry: Value,
1038 #[serde(default)]
1039 input: Value,
1040 #[serde(default)]
1041 nodes: HashMap<String, NodeOutput>,
1042 #[serde(default)]
1043 egress: Vec<Value>,
1044 #[serde(default, skip_serializing_if = "Option::is_none")]
1045 last_output: Option<Value>,
1046}
1047
1048impl ExecutionState {
1049 fn new(input: Value) -> Self {
1050 Self {
1051 entry: input.clone(),
1052 input,
1053 nodes: HashMap::new(),
1054 egress: Vec::new(),
1055 last_output: None,
1056 }
1057 }
1058
1059 fn ensure_entry(&mut self) {
1060 if self.entry.is_null() {
1061 self.entry = self.input.clone();
1062 }
1063 }
1064
1065 fn context(&self) -> Value {
1066 let mut nodes = JsonMap::new();
1067 for (id, output) in &self.nodes {
1068 nodes.insert(
1069 id.clone(),
1070 json!({
1071 "ok": output.ok,
1072 "payload": output.payload.clone(),
1073 "meta": output.meta.clone(),
1074 }),
1075 );
1076 }
1077 json!({
1078 "entry": self.entry.clone(),
1079 "input": self.input.clone(),
1080 "nodes": nodes,
1081 })
1082 }
1083
1084 fn outputs_map(&self) -> JsonMap<String, Value> {
1085 let mut outputs = JsonMap::new();
1086 for (id, output) in &self.nodes {
1087 outputs.insert(id.clone(), output.payload.clone());
1088 }
1089 outputs
1090 }
1091 fn push_egress(&mut self, payload: Value) {
1092 self.egress.push(payload);
1093 }
1094
1095 fn replace_input(&mut self, input: Value) {
1096 self.input = input;
1097 }
1098
1099 fn clear_egress(&mut self) {
1100 self.egress.clear();
1101 }
1102
1103 fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
1104 if self.egress.is_empty() {
1105 return final_payload.unwrap_or(Value::Null);
1106 }
1107 let mut emitted = std::mem::take(&mut self.egress);
1108 if let Some(value) = final_payload {
1109 match value {
1110 Value::Null => {}
1111 Value::Array(items) => emitted.extend(items),
1112 other => emitted.push(other),
1113 }
1114 }
1115 Value::Array(emitted)
1116 }
1117}
1118
1119#[derive(Clone, Debug, Serialize, Deserialize)]
1120struct NodeOutput {
1121 ok: bool,
1122 payload: Value,
1123 meta: Value,
1124}
1125
1126impl NodeOutput {
1127 fn new(payload: Value) -> Self {
1128 Self {
1129 ok: true,
1130 payload,
1131 meta: Value::Null,
1132 }
1133 }
1134}
1135
1136struct DispatchOutcome {
1137 output: NodeOutput,
1138 wait_reason: Option<String>,
1139}
1140
1141impl DispatchOutcome {
1142 fn complete(output: NodeOutput) -> Self {
1143 Self {
1144 output,
1145 wait_reason: None,
1146 }
1147 }
1148
1149 fn wait(output: NodeOutput, reason: Option<String>) -> Self {
1150 Self {
1151 output,
1152 wait_reason: reason,
1153 }
1154 }
1155}
1156
1157fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
1158 ComponentExecCtx {
1159 tenant: ComponentTenantCtx {
1160 tenant: ctx.tenant.to_string(),
1161 team: None,
1162 user: ctx.provider_id.map(str::to_string),
1163 trace_id: None,
1164 correlation_id: ctx.session_id.map(str::to_string),
1165 deadline_unix_ms: None,
1166 attempt: ctx.attempt,
1167 idempotency_key: ctx.session_id.map(str::to_string),
1168 },
1169 flow_id: ctx.flow_id.to_string(),
1170 node_id: Some(node_id.to_string()),
1171 }
1172}
1173
1174fn component_error(value: &Value) -> Option<(String, String)> {
1175 let obj = value.as_object()?;
1176 let ok = obj.get("ok").and_then(Value::as_bool)?;
1177 if ok {
1178 return None;
1179 }
1180 let err = obj.get("error")?.as_object()?;
1181 let code = err
1182 .get("code")
1183 .and_then(Value::as_str)
1184 .unwrap_or("component_error");
1185 let message = err
1186 .get("message")
1187 .and_then(Value::as_str)
1188 .unwrap_or("component reported error");
1189 Some((code.to_string(), message.to_string()))
1190}
1191
1192fn extract_wait_reason(payload: &Value) -> Option<String> {
1193 match payload {
1194 Value::String(s) => Some(s.clone()),
1195 Value::Object(map) => map
1196 .get("reason")
1197 .and_then(Value::as_str)
1198 .map(|value| value.to_string()),
1199 _ => None,
1200 }
1201}
1202
1203fn template_context(state: &ExecutionState, prev: Value) -> Value {
1204 let entry = if state.entry.is_null() {
1205 Value::Object(JsonMap::new())
1206 } else {
1207 state.entry.clone()
1208 };
1209 let mut ctx = JsonMap::new();
1210 ctx.insert("entry".into(), entry);
1211 ctx.insert("prev".into(), prev);
1212 ctx.insert("node".into(), Value::Object(state.outputs_map()));
1213 ctx.insert("state".into(), state.context());
1214 Value::Object(ctx)
1215}
1216
1217impl From<Flow> for HostFlow {
1218 fn from(value: Flow) -> Self {
1219 let mut nodes = IndexMap::new();
1220 for (id, node) in value.nodes {
1221 nodes.insert(id.clone(), HostNode::from(node));
1222 }
1223 let start = value
1224 .entrypoints
1225 .get("default")
1226 .and_then(Value::as_str)
1227 .and_then(|id| NodeId::from_str(id).ok())
1228 .or_else(|| nodes.keys().next().cloned());
1229 Self {
1230 id: value.id.as_str().to_string(),
1231 start,
1232 nodes,
1233 }
1234 }
1235}
1236
1237impl From<Node> for HostNode {
1238 fn from(node: Node) -> Self {
1239 let component_ref = node.component.id.as_str().to_string();
1240 let raw_operation = node.component.operation.clone();
1241 let operation_in_mapping = extract_operation_from_mapping(&node.input.mapping);
1242 let operation_is_component_exec = raw_operation.as_deref() == Some("component.exec");
1243 let operation_is_emit = raw_operation
1244 .as_deref()
1245 .map(|op| op.starts_with("emit."))
1246 .unwrap_or(false);
1247 let is_component_exec = component_ref == "component.exec" || operation_is_component_exec;
1248
1249 let kind = if is_component_exec {
1250 let target = if component_ref == "component.exec" {
1251 if let Some(op) = raw_operation
1252 .as_deref()
1253 .filter(|op| op.starts_with("emit."))
1254 {
1255 op.to_string()
1256 } else {
1257 extract_target_component(&node.input.mapping)
1258 .unwrap_or_else(|| "component.exec".to_string())
1259 }
1260 } else {
1261 extract_target_component(&node.input.mapping)
1262 .unwrap_or_else(|| component_ref.clone())
1263 };
1264 if target.starts_with("emit.") {
1265 NodeKind::BuiltinEmit {
1266 kind: emit_kind_from_ref(&target),
1267 }
1268 } else {
1269 NodeKind::Exec {
1270 target_component: target,
1271 }
1272 }
1273 } else if operation_is_emit {
1274 NodeKind::BuiltinEmit {
1275 kind: emit_kind_from_ref(raw_operation.as_deref().unwrap_or("emit.log")),
1276 }
1277 } else {
1278 match component_ref.as_str() {
1279 "flow.call" => NodeKind::FlowCall,
1280 "provider.invoke" => NodeKind::ProviderInvoke,
1281 "session.wait" => NodeKind::Wait,
1282 comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
1283 kind: emit_kind_from_ref(comp),
1284 },
1285 other => NodeKind::PackComponent {
1286 component_ref: other.to_string(),
1287 },
1288 }
1289 };
1290 let component_label = match &kind {
1291 NodeKind::Exec { .. } => "component.exec".to_string(),
1292 NodeKind::PackComponent { component_ref } => component_ref.clone(),
1293 NodeKind::ProviderInvoke => "provider.invoke".to_string(),
1294 NodeKind::FlowCall => "flow.call".to_string(),
1295 NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
1296 NodeKind::Wait => "session.wait".to_string(),
1297 };
1298 let operation_name = if is_component_exec && operation_is_component_exec {
1299 None
1300 } else {
1301 raw_operation.clone()
1302 };
1303 let payload_expr = match kind {
1304 NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.input.mapping),
1305 _ => node.input.mapping.clone(),
1306 };
1307 Self {
1308 kind,
1309 component: component_label,
1310 component_id: if is_component_exec {
1311 "component.exec".to_string()
1312 } else {
1313 component_ref
1314 },
1315 operation_name,
1316 operation_in_mapping,
1317 payload_expr,
1318 routing: node.routing,
1319 }
1320 }
1321}
1322
1323fn extract_target_component(payload: &Value) -> Option<String> {
1324 match payload {
1325 Value::Object(map) => map
1326 .get("component")
1327 .or_else(|| map.get("component_ref"))
1328 .and_then(Value::as_str)
1329 .map(|s| s.to_string()),
1330 _ => None,
1331 }
1332}
1333
1334fn extract_operation_from_mapping(payload: &Value) -> Option<String> {
1335 match payload {
1336 Value::Object(map) => map
1337 .get("operation")
1338 .or_else(|| map.get("op"))
1339 .and_then(Value::as_str)
1340 .map(str::trim)
1341 .filter(|value| !value.is_empty())
1342 .map(|value| value.to_string()),
1343 _ => None,
1344 }
1345}
1346
1347fn extract_emit_payload(payload: &Value) -> Value {
1348 if let Value::Object(map) = payload {
1349 if let Some(input) = map.get("input") {
1350 return input.clone();
1351 }
1352 if let Some(inner) = map.get("payload") {
1353 return inner.clone();
1354 }
1355 }
1356 payload.clone()
1357}
1358
1359fn split_operation_payload(payload: Value) -> (Value, Value) {
1360 if let Value::Object(mut map) = payload.clone()
1361 && map.contains_key("input")
1362 {
1363 let input = map.remove("input").unwrap_or(Value::Null);
1364 let config = map.remove("config").unwrap_or(Value::Null);
1365 let legacy_only = map.keys().all(|key| {
1366 matches!(
1367 key.as_str(),
1368 "operation" | "op" | "component" | "component_ref"
1369 )
1370 });
1371 if legacy_only {
1372 return (input, config);
1373 }
1374 }
1375 (payload, Value::Null)
1376}
1377
1378fn resolve_component_operation(
1379 node_id: &str,
1380 component_label: &str,
1381 payload_operation: Option<String>,
1382 operation_override: Option<&str>,
1383 operation_in_mapping: Option<&str>,
1384) -> Result<String> {
1385 if let Some(op) = operation_override
1386 .map(str::trim)
1387 .filter(|value| !value.is_empty())
1388 {
1389 return Ok(op.to_string());
1390 }
1391
1392 if let Some(op) = payload_operation
1393 .as_deref()
1394 .map(str::trim)
1395 .filter(|value| !value.is_empty())
1396 {
1397 return Ok(op.to_string());
1398 }
1399
1400 let mut message = format!(
1401 "missing operation for node `{}` (component `{}`); expected node.component.operation to be set",
1402 node_id, component_label,
1403 );
1404 if let Some(found) = operation_in_mapping {
1405 message.push_str(&format!(
1406 ". Found operation in input.mapping (`{}`) but this is not used; pack compiler must preserve node.component.operation.",
1407 found
1408 ));
1409 }
1410 bail!(message);
1411}
1412
1413fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
1414 match component_ref {
1415 "emit.log" => EmitKind::Log,
1416 "emit.response" => EmitKind::Response,
1417 other => EmitKind::Other(other.to_string()),
1418 }
1419}
1420
1421fn emit_ref_from_kind(kind: &EmitKind) -> String {
1422 match kind {
1423 EmitKind::Log => "emit.log".to_string(),
1424 EmitKind::Response => "emit.response".to_string(),
1425 EmitKind::Other(other) => other.clone(),
1426 }
1427}
1428
1429#[cfg(test)]
1430mod tests {
1431 use super::*;
1432 use crate::validate::{ValidationConfig, ValidationMode};
1433 use greentic_types::{
1434 Flow, FlowComponentRef, FlowId, FlowKind, InputMapping, Node, NodeId, OutputMapping,
1435 Routing, TelemetryHints,
1436 };
1437 use serde_json::json;
1438 use std::collections::BTreeMap;
1439 use std::str::FromStr;
1440 use std::sync::Mutex;
1441 use tokio::runtime::Runtime;
1442
1443 fn minimal_engine() -> FlowEngine {
1444 FlowEngine {
1445 packs: Vec::new(),
1446 flows: Vec::new(),
1447 flow_sources: HashMap::new(),
1448 flow_cache: RwLock::new(HashMap::new()),
1449 default_env: "local".to_string(),
1450 validation: ValidationConfig {
1451 mode: ValidationMode::Off,
1452 },
1453 }
1454 }
1455
1456 #[test]
1457 fn templating_renders_with_partials_and_data() {
1458 let mut state = ExecutionState::new(json!({ "city": "London" }));
1459 state.nodes.insert(
1460 "forecast".to_string(),
1461 NodeOutput::new(json!({ "temp": "20C" })),
1462 );
1463
1464 let ctx = state.context();
1466 assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
1467 }
1468
1469 #[test]
1470 fn finalize_wraps_emitted_payloads() {
1471 let mut state = ExecutionState::new(json!({}));
1472 state.push_egress(json!({ "text": "first" }));
1473 state.push_egress(json!({ "text": "second" }));
1474 let result = state.finalize_with(Some(json!({ "text": "final" })));
1475 assert_eq!(
1476 result,
1477 json!([
1478 { "text": "first" },
1479 { "text": "second" },
1480 { "text": "final" }
1481 ])
1482 );
1483 }
1484
1485 #[test]
1486 fn finalize_flattens_final_array() {
1487 let mut state = ExecutionState::new(json!({}));
1488 state.push_egress(json!({ "text": "only" }));
1489 let result = state.finalize_with(Some(json!([
1490 { "text": "extra-1" },
1491 { "text": "extra-2" }
1492 ])));
1493 assert_eq!(
1494 result,
1495 json!([
1496 { "text": "only" },
1497 { "text": "extra-1" },
1498 { "text": "extra-2" }
1499 ])
1500 );
1501 }
1502
1503 #[test]
1504 fn missing_operation_reports_node_and_component() {
1505 let engine = minimal_engine();
1506 let rt = Runtime::new().unwrap();
1507 let retry_config = RetryConfig {
1508 max_attempts: 1,
1509 base_delay_ms: 1,
1510 };
1511 let ctx = FlowContext {
1512 tenant: "tenant",
1513 pack_id: "test-pack",
1514 flow_id: "flow",
1515 node_id: Some("missing-op"),
1516 tool: None,
1517 action: None,
1518 session_id: None,
1519 provider_id: None,
1520 retry_config,
1521 attempt: 1,
1522 observer: None,
1523 mocks: None,
1524 };
1525 let node = HostNode {
1526 kind: NodeKind::Exec {
1527 target_component: "qa.process".into(),
1528 },
1529 component: "component.exec".into(),
1530 component_id: "component.exec".into(),
1531 operation_name: None,
1532 operation_in_mapping: None,
1533 payload_expr: Value::Null,
1534 routing: Routing::End,
1535 };
1536 let _state = ExecutionState::new(Value::Null);
1537 let payload = json!({ "component": "qa.process" });
1538 let event = NodeEvent {
1539 context: &ctx,
1540 node_id: "missing-op",
1541 node: &node,
1542 payload: &payload,
1543 };
1544 let err = rt
1545 .block_on(engine.execute_component_exec(
1546 &ctx,
1547 "missing-op",
1548 &node,
1549 payload.clone(),
1550 &event,
1551 ComponentOverrides {
1552 component: None,
1553 operation: None,
1554 },
1555 ))
1556 .unwrap_err();
1557 let message = err.to_string();
1558 assert!(
1559 message.contains("missing operation for node `missing-op`"),
1560 "unexpected message: {message}"
1561 );
1562 assert!(
1563 message.contains("(component `component.exec`)"),
1564 "unexpected message: {message}"
1565 );
1566 }
1567
1568 #[test]
1569 fn missing_operation_mentions_mapping_hint() {
1570 let engine = minimal_engine();
1571 let rt = Runtime::new().unwrap();
1572 let retry_config = RetryConfig {
1573 max_attempts: 1,
1574 base_delay_ms: 1,
1575 };
1576 let ctx = FlowContext {
1577 tenant: "tenant",
1578 pack_id: "test-pack",
1579 flow_id: "flow",
1580 node_id: Some("missing-op-hint"),
1581 tool: None,
1582 action: None,
1583 session_id: None,
1584 provider_id: None,
1585 retry_config,
1586 attempt: 1,
1587 observer: None,
1588 mocks: None,
1589 };
1590 let node = HostNode {
1591 kind: NodeKind::Exec {
1592 target_component: "qa.process".into(),
1593 },
1594 component: "component.exec".into(),
1595 component_id: "component.exec".into(),
1596 operation_name: None,
1597 operation_in_mapping: Some("render".into()),
1598 payload_expr: Value::Null,
1599 routing: Routing::End,
1600 };
1601 let _state = ExecutionState::new(Value::Null);
1602 let payload = json!({ "component": "qa.process" });
1603 let event = NodeEvent {
1604 context: &ctx,
1605 node_id: "missing-op-hint",
1606 node: &node,
1607 payload: &payload,
1608 };
1609 let err = rt
1610 .block_on(engine.execute_component_exec(
1611 &ctx,
1612 "missing-op-hint",
1613 &node,
1614 payload.clone(),
1615 &event,
1616 ComponentOverrides {
1617 component: None,
1618 operation: None,
1619 },
1620 ))
1621 .unwrap_err();
1622 let message = err.to_string();
1623 assert!(
1624 message.contains("missing operation for node `missing-op-hint`"),
1625 "unexpected message: {message}"
1626 );
1627 assert!(
1628 message.contains("Found operation in input.mapping (`render`)"),
1629 "unexpected message: {message}"
1630 );
1631 }
1632
1633 struct CountingObserver {
1634 starts: Mutex<Vec<String>>,
1635 ends: Mutex<Vec<Value>>,
1636 }
1637
1638 impl CountingObserver {
1639 fn new() -> Self {
1640 Self {
1641 starts: Mutex::new(Vec::new()),
1642 ends: Mutex::new(Vec::new()),
1643 }
1644 }
1645 }
1646
1647 impl ExecutionObserver for CountingObserver {
1648 fn on_node_start(&self, event: &NodeEvent<'_>) {
1649 self.starts.lock().unwrap().push(event.node_id.to_string());
1650 }
1651
1652 fn on_node_end(&self, _event: &NodeEvent<'_>, output: &Value) {
1653 self.ends.lock().unwrap().push(output.clone());
1654 }
1655
1656 fn on_node_error(&self, _event: &NodeEvent<'_>, _error: &dyn StdError) {}
1657 }
1658
1659 #[test]
1660 fn emits_end_event_for_successful_node() {
1661 let node_id = NodeId::from_str("emit").unwrap();
1662 let node = Node {
1663 id: node_id.clone(),
1664 component: FlowComponentRef {
1665 id: "emit.log".parse().unwrap(),
1666 pack_alias: None,
1667 operation: None,
1668 },
1669 input: InputMapping {
1670 mapping: json!({ "message": "logged" }),
1671 },
1672 output: OutputMapping {
1673 mapping: Value::Null,
1674 },
1675 routing: Routing::End,
1676 telemetry: TelemetryHints::default(),
1677 };
1678 let mut nodes = indexmap::IndexMap::default();
1679 nodes.insert(node_id.clone(), node);
1680 let flow = Flow {
1681 schema_version: "1.0".into(),
1682 id: FlowId::from_str("emit.flow").unwrap(),
1683 kind: FlowKind::Messaging,
1684 entrypoints: BTreeMap::from([(
1685 "default".to_string(),
1686 Value::String(node_id.to_string()),
1687 )]),
1688 nodes,
1689 metadata: Default::default(),
1690 };
1691 let host_flow = HostFlow::from(flow);
1692
1693 let engine = FlowEngine {
1694 packs: Vec::new(),
1695 flows: Vec::new(),
1696 flow_sources: HashMap::new(),
1697 flow_cache: RwLock::new(HashMap::from([(
1698 FlowKey {
1699 pack_id: "test-pack".to_string(),
1700 flow_id: "emit.flow".to_string(),
1701 },
1702 host_flow,
1703 )])),
1704 default_env: "local".to_string(),
1705 validation: ValidationConfig {
1706 mode: ValidationMode::Off,
1707 },
1708 };
1709 let observer = CountingObserver::new();
1710 let ctx = FlowContext {
1711 tenant: "demo",
1712 pack_id: "test-pack",
1713 flow_id: "emit.flow",
1714 node_id: None,
1715 tool: None,
1716 action: None,
1717 session_id: None,
1718 provider_id: None,
1719 retry_config: RetryConfig {
1720 max_attempts: 1,
1721 base_delay_ms: 1,
1722 },
1723 attempt: 1,
1724 observer: Some(&observer),
1725 mocks: None,
1726 };
1727
1728 let rt = Runtime::new().unwrap();
1729 let result = rt.block_on(engine.execute(ctx, Value::Null)).unwrap();
1730 assert!(matches!(result.status, FlowStatus::Completed));
1731
1732 let starts = observer.starts.lock().unwrap();
1733 let ends = observer.ends.lock().unwrap();
1734 assert_eq!(starts.len(), 1);
1735 assert_eq!(ends.len(), 1);
1736 assert_eq!(ends[0], json!({ "message": "logged" }));
1737 }
1738}
1739
1740use tracing::Instrument;
1741
1742pub struct FlowContext<'a> {
1743 pub tenant: &'a str,
1744 pub pack_id: &'a str,
1745 pub flow_id: &'a str,
1746 pub node_id: Option<&'a str>,
1747 pub tool: Option<&'a str>,
1748 pub action: Option<&'a str>,
1749 pub session_id: Option<&'a str>,
1750 pub provider_id: Option<&'a str>,
1751 pub retry_config: RetryConfig,
1752 pub attempt: u32,
1753 pub observer: Option<&'a dyn ExecutionObserver>,
1754 pub mocks: Option<&'a MockLayer>,
1755}
1756
1757#[derive(Copy, Clone)]
1758pub struct RetryConfig {
1759 pub max_attempts: u32,
1760 pub base_delay_ms: u64,
1761}
1762
1763fn should_retry(err: &anyhow::Error) -> bool {
1764 let lower = err.to_string().to_lowercase();
1765 lower.contains("transient")
1766 || lower.contains("unavailable")
1767 || lower.contains("internal")
1768 || lower.contains("timeout")
1769}
1770
1771impl From<FlowRetryConfig> for RetryConfig {
1772 fn from(value: FlowRetryConfig) -> Self {
1773 Self {
1774 max_attempts: value.max_attempts.max(1),
1775 base_delay_ms: value.base_delay_ms.max(50),
1776 }
1777 }
1778}