1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
9use anyhow::{Context, Result, anyhow, bail};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use super::templating::{TemplateOptions, render_template_value};
18use crate::config::{FlowRetryConfig, HostConfig};
19use crate::pack::{FlowDescriptor, PackRuntime};
20use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
21use crate::validate::{
22 ValidationConfig, ValidationIssue, ValidationMode, validate_component_envelope,
23 validate_tool_envelope,
24};
25use greentic_types::{Flow, Node, NodeId, Routing};
26
27pub struct FlowEngine {
28 packs: Vec<Arc<PackRuntime>>,
29 flows: Vec<FlowDescriptor>,
30 flow_sources: HashMap<FlowKey, usize>,
31 flow_cache: RwLock<HashMap<FlowKey, HostFlow>>,
32 default_env: String,
33 validation: ValidationConfig,
34}
35
36#[derive(Clone, Debug, PartialEq, Eq, Hash)]
37struct FlowKey {
38 pack_id: String,
39 flow_id: String,
40}
41
42#[derive(Clone, Debug, Serialize, Deserialize)]
43pub struct FlowSnapshot {
44 pub pack_id: String,
45 pub flow_id: String,
46 pub next_node: String,
47 pub state: ExecutionState,
48}
49
50#[derive(Clone, Debug)]
51pub struct FlowWait {
52 pub reason: Option<String>,
53 pub snapshot: FlowSnapshot,
54}
55
56#[derive(Clone, Debug)]
57pub enum FlowStatus {
58 Completed,
59 Waiting(Box<FlowWait>),
60}
61
62#[derive(Clone, Debug)]
63pub struct FlowExecution {
64 pub output: Value,
65 pub status: FlowStatus,
66}
67
68#[derive(Clone, Debug)]
69struct HostFlow {
70 id: String,
71 start: Option<NodeId>,
72 nodes: IndexMap<NodeId, HostNode>,
73}
74
75#[derive(Clone, Debug)]
76pub struct HostNode {
77 kind: NodeKind,
78 pub component: String,
80 component_id: String,
81 operation_name: Option<String>,
82 operation_in_mapping: Option<String>,
83 payload_expr: Value,
84 routing: Routing,
85}
86
87impl HostNode {
88 pub fn component_id(&self) -> &str {
89 &self.component_id
90 }
91
92 pub fn operation_name(&self) -> Option<&str> {
93 self.operation_name.as_deref()
94 }
95
96 pub fn operation_in_mapping(&self) -> Option<&str> {
97 self.operation_in_mapping.as_deref()
98 }
99}
100
101#[derive(Clone, Debug)]
102enum NodeKind {
103 Exec { target_component: String },
104 PackComponent { component_ref: String },
105 ProviderInvoke,
106 FlowCall,
107 BuiltinEmit { kind: EmitKind },
108 Wait,
109}
110
111#[derive(Clone, Debug)]
112enum EmitKind {
113 Log,
114 Response,
115 Other(String),
116}
117
118struct ComponentOverrides<'a> {
119 component: Option<&'a str>,
120 operation: Option<&'a str>,
121}
122
123struct ComponentCall {
124 component_ref: String,
125 operation: String,
126 input: Value,
127 config: Value,
128}
129
130impl FlowExecution {
131 fn completed(output: Value) -> Self {
132 Self {
133 output,
134 status: FlowStatus::Completed,
135 }
136 }
137
138 fn waiting(output: Value, wait: FlowWait) -> Self {
139 Self {
140 output,
141 status: FlowStatus::Waiting(Box::new(wait)),
142 }
143 }
144}
145
146impl FlowEngine {
147 pub async fn new(packs: Vec<Arc<PackRuntime>>, config: Arc<HostConfig>) -> Result<Self> {
148 let mut flow_sources: HashMap<FlowKey, usize> = HashMap::new();
149 let mut descriptors = Vec::new();
150 let mut bindings = HashMap::new();
151 for pack in &config.pack_bindings {
152 bindings.insert(pack.pack_id.clone(), pack.flows.clone());
153 }
154 let enforce_bindings = !bindings.is_empty();
155 for (idx, pack) in packs.iter().enumerate() {
156 let pack_id = pack.metadata().pack_id.clone();
157 if enforce_bindings && !bindings.contains_key(&pack_id) {
158 bail!("no gtbind entries found for pack {}", pack_id);
159 }
160 let flows = pack.list_flows().await?;
161 let allowed = bindings.get(&pack_id).map(|flows| {
162 flows
163 .iter()
164 .cloned()
165 .collect::<std::collections::HashSet<_>>()
166 });
167 let mut seen = std::collections::HashSet::new();
168 for flow in flows {
169 if let Some(ref allow) = allowed
170 && !allow.contains(&flow.id)
171 {
172 continue;
173 }
174 seen.insert(flow.id.clone());
175 tracing::info!(
176 flow_id = %flow.id,
177 flow_type = %flow.flow_type,
178 pack_id = %flow.pack_id,
179 pack_index = idx,
180 "registered flow"
181 );
182 flow_sources.insert(
183 FlowKey {
184 pack_id: flow.pack_id.clone(),
185 flow_id: flow.id.clone(),
186 },
187 idx,
188 );
189 descriptors.retain(|existing: &FlowDescriptor| {
190 !(existing.id == flow.id && existing.pack_id == flow.pack_id)
191 });
192 descriptors.push(flow);
193 }
194 if let Some(allow) = allowed {
195 let missing = allow.difference(&seen).cloned().collect::<Vec<_>>();
196 if !missing.is_empty() {
197 bail!(
198 "gtbind flow ids missing in pack {}: {}",
199 pack_id,
200 missing.join(", ")
201 );
202 }
203 }
204 }
205
206 let mut flow_map = HashMap::new();
207 for flow in &descriptors {
208 let pack_id = flow.pack_id.clone();
209 if let Some(&pack_idx) = flow_sources.get(&FlowKey {
210 pack_id: pack_id.clone(),
211 flow_id: flow.id.clone(),
212 }) {
213 let pack_clone = Arc::clone(&packs[pack_idx]);
214 let flow_id = flow.id.clone();
215 let task_flow_id = flow_id.clone();
216 match task::spawn_blocking(move || pack_clone.load_flow(&task_flow_id)).await {
217 Ok(Ok(loaded_flow)) => {
218 flow_map.insert(
219 FlowKey {
220 pack_id: pack_id.clone(),
221 flow_id,
222 },
223 HostFlow::from(loaded_flow),
224 );
225 }
226 Ok(Err(err)) => {
227 tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
228 }
229 Err(err) => {
230 tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
231 }
232 }
233 }
234 }
235
236 Ok(Self {
237 packs,
238 flows: descriptors,
239 flow_sources,
240 flow_cache: RwLock::new(flow_map),
241 default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
242 validation: config.validation.clone(),
243 })
244 }
245
246 async fn get_or_load_flow(&self, pack_id: &str, flow_id: &str) -> Result<HostFlow> {
247 let key = FlowKey {
248 pack_id: pack_id.to_string(),
249 flow_id: flow_id.to_string(),
250 };
251 if let Some(flow) = self.flow_cache.read().get(&key).cloned() {
252 return Ok(flow);
253 }
254
255 let pack_idx = *self
256 .flow_sources
257 .get(&key)
258 .with_context(|| format!("flow {pack_id}:{flow_id} not registered"))?;
259 let pack = Arc::clone(&self.packs[pack_idx]);
260 let flow_id_owned = flow_id.to_string();
261 let task_flow_id = flow_id_owned.clone();
262 let flow = task::spawn_blocking(move || pack.load_flow(&task_flow_id))
263 .await
264 .context("failed to join flow metadata task")??;
265 let host_flow = HostFlow::from(flow);
266 self.flow_cache.write().insert(
267 FlowKey {
268 pack_id: pack_id.to_string(),
269 flow_id: flow_id_owned.clone(),
270 },
271 host_flow.clone(),
272 );
273 Ok(host_flow)
274 }
275
276 pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
277 let span = tracing::info_span!(
278 "flow.execute",
279 tenant = tracing::field::Empty,
280 flow_id = tracing::field::Empty,
281 node_id = tracing::field::Empty,
282 tool = tracing::field::Empty,
283 action = tracing::field::Empty
284 );
285 annotate_span(
286 &span,
287 &FlowSpanAttributes {
288 tenant: ctx.tenant,
289 flow_id: ctx.flow_id,
290 node_id: ctx.node_id,
291 tool: ctx.tool,
292 action: ctx.action,
293 },
294 );
295 set_flow_context(
296 &self.default_env,
297 ctx.tenant,
298 ctx.flow_id,
299 ctx.node_id,
300 ctx.provider_id,
301 ctx.session_id,
302 );
303 let retry_config = ctx.retry_config;
304 let original_input = input;
305 async move {
306 let mut attempt = 0u32;
307 loop {
308 attempt += 1;
309 match self.execute_once(&ctx, original_input.clone()).await {
310 Ok(value) => return Ok(value),
311 Err(err) => {
312 if attempt >= retry_config.max_attempts || !should_retry(&err) {
313 return Err(err);
314 }
315 let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
316 tracing::warn!(
317 tenant = ctx.tenant,
318 flow_id = ctx.flow_id,
319 attempt,
320 max_attempts = retry_config.max_attempts,
321 delay_ms = delay,
322 error = %err,
323 "transient flow execution failure, backing off"
324 );
325 tokio::time::sleep(Duration::from_millis(delay)).await;
326 }
327 }
328 }
329 }
330 .instrument(span)
331 .await
332 }
333
334 pub async fn resume(
335 &self,
336 ctx: FlowContext<'_>,
337 snapshot: FlowSnapshot,
338 input: Value,
339 ) -> Result<FlowExecution> {
340 if snapshot.pack_id != ctx.pack_id {
341 bail!(
342 "snapshot pack {} does not match requested {}",
343 snapshot.pack_id,
344 ctx.pack_id
345 );
346 }
347 if snapshot.flow_id != ctx.flow_id {
348 bail!(
349 "snapshot flow {} does not match requested {}",
350 snapshot.flow_id,
351 ctx.flow_id
352 );
353 }
354 let flow_ir = self.get_or_load_flow(ctx.pack_id, ctx.flow_id).await?;
355 let mut state = snapshot.state;
356 state.replace_input(input);
357 state.ensure_entry();
358 self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
359 .await
360 }
361
362 async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
363 let flow_ir = self.get_or_load_flow(ctx.pack_id, ctx.flow_id).await?;
364 let state = ExecutionState::new(input);
365 self.drive_flow(ctx, flow_ir, state, None).await
366 }
367
368 async fn drive_flow(
369 &self,
370 ctx: &FlowContext<'_>,
371 flow_ir: HostFlow,
372 mut state: ExecutionState,
373 resume_from: Option<String>,
374 ) -> Result<FlowExecution> {
375 let mut current = match resume_from {
376 Some(node) => NodeId::from_str(&node)
377 .with_context(|| format!("invalid resume node id `{node}`"))?,
378 None => flow_ir
379 .start
380 .clone()
381 .or_else(|| flow_ir.nodes.keys().next().cloned())
382 .with_context(|| format!("flow {} has no start node", flow_ir.id))?,
383 };
384
385 loop {
386 let node = flow_ir
387 .nodes
388 .get(¤t)
389 .with_context(|| format!("node {} not found", current.as_str()))?;
390
391 let payload_template = node.payload_expr.clone();
392 let prev = state
393 .last_output
394 .as_ref()
395 .cloned()
396 .unwrap_or_else(|| Value::Object(JsonMap::new()));
397 let ctx_value = template_context(&state, prev);
398 let payload =
399 render_template_value(&payload_template, &ctx_value, TemplateOptions::default())
400 .context("failed to render node input template")?;
401 let observed_payload = payload.clone();
402 let node_id = current.clone();
403 let event = NodeEvent {
404 context: ctx,
405 node_id: node_id.as_str(),
406 node,
407 payload: &observed_payload,
408 };
409 if let Some(observer) = ctx.observer {
410 observer.on_node_start(&event);
411 }
412 let dispatch = self
413 .dispatch_node(ctx, node_id.as_str(), node, &mut state, payload, &event)
414 .await;
415 let DispatchOutcome {
416 output,
417 wait_reason,
418 } = match dispatch {
419 Ok(outcome) => outcome,
420 Err(err) => {
421 if let Some(observer) = ctx.observer {
422 observer.on_node_error(&event, err.as_ref());
423 }
424 return Err(err);
425 }
426 };
427
428 state.nodes.insert(node_id.clone().into(), output.clone());
429 state.last_output = Some(output.payload.clone());
430 if let Some(observer) = ctx.observer {
431 observer.on_node_end(&event, &output.payload);
432 }
433
434 let (next, should_exit) = match &node.routing {
435 Routing::Next { node_id } => (Some(node_id.clone()), false),
436 Routing::End | Routing::Reply => (None, true),
437 Routing::Branch { default, .. } => (default.clone(), default.is_none()),
438 Routing::Custom(raw) => {
439 tracing::warn!(
440 flow_id = %flow_ir.id,
441 node_id = %node_id,
442 routing = ?raw,
443 "unsupported routing; terminating flow"
444 );
445 (None, true)
446 }
447 };
448
449 if let Some(wait_reason) = wait_reason {
450 let resume_target = next.clone().ok_or_else(|| {
451 anyhow!(
452 "session.wait node {} requires a non-empty route",
453 current.as_str()
454 )
455 })?;
456 let mut snapshot_state = state.clone();
457 snapshot_state.clear_egress();
458 let snapshot = FlowSnapshot {
459 pack_id: ctx.pack_id.to_string(),
460 flow_id: ctx.flow_id.to_string(),
461 next_node: resume_target.as_str().to_string(),
462 state: snapshot_state,
463 };
464 let output_value = state.clone().finalize_with(None);
465 return Ok(FlowExecution::waiting(
466 output_value,
467 FlowWait {
468 reason: Some(wait_reason),
469 snapshot,
470 },
471 ));
472 }
473
474 if should_exit {
475 return Ok(FlowExecution::completed(
476 state.finalize_with(Some(output.payload.clone())),
477 ));
478 }
479
480 match next {
481 Some(n) => current = n,
482 None => {
483 return Ok(FlowExecution::completed(
484 state.finalize_with(Some(output.payload.clone())),
485 ));
486 }
487 }
488 }
489 }
490
491 async fn dispatch_node(
492 &self,
493 ctx: &FlowContext<'_>,
494 node_id: &str,
495 node: &HostNode,
496 state: &mut ExecutionState,
497 payload: Value,
498 event: &NodeEvent<'_>,
499 ) -> Result<DispatchOutcome> {
500 match &node.kind {
501 NodeKind::Exec { target_component } => self
502 .execute_component_exec(
503 ctx,
504 node_id,
505 node,
506 payload,
507 event,
508 ComponentOverrides {
509 component: Some(target_component.as_str()),
510 operation: node.operation_name.as_deref(),
511 },
512 )
513 .await
514 .map(DispatchOutcome::complete),
515 NodeKind::PackComponent { component_ref } => self
516 .execute_component_call(ctx, node_id, node, payload, component_ref.as_str(), event)
517 .await
518 .map(DispatchOutcome::complete),
519 NodeKind::FlowCall => self
520 .execute_flow_call(ctx, payload)
521 .await
522 .map(DispatchOutcome::complete),
523 NodeKind::ProviderInvoke => self
524 .execute_provider_invoke(ctx, node_id, state, payload, event)
525 .await
526 .map(DispatchOutcome::complete),
527 NodeKind::BuiltinEmit { kind } => {
528 match kind {
529 EmitKind::Log | EmitKind::Response => {}
530 EmitKind::Other(component) => {
531 tracing::debug!(%component, "handling emit.* as builtin");
532 }
533 }
534 state.push_egress(payload.clone());
535 Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
536 }
537 NodeKind::Wait => {
538 let reason = extract_wait_reason(&payload);
539 Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
540 }
541 }
542 }
543
544 async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
545 #[derive(Deserialize)]
546 struct FlowCallPayload {
547 #[serde(alias = "flow")]
548 flow_id: String,
549 #[serde(default)]
550 input: Value,
551 }
552
553 let call: FlowCallPayload =
554 serde_json::from_value(payload).context("invalid payload for flow.call node")?;
555 if call.flow_id.trim().is_empty() {
556 bail!("flow.call requires a non-empty flow_id");
557 }
558
559 let sub_input = if call.input.is_null() {
560 Value::Null
561 } else {
562 call.input
563 };
564
565 let flow_id_owned = call.flow_id;
566 let action = "flow.call";
567 let sub_ctx = FlowContext {
568 tenant: ctx.tenant,
569 pack_id: ctx.pack_id,
570 flow_id: flow_id_owned.as_str(),
571 node_id: None,
572 tool: ctx.tool,
573 action: Some(action),
574 session_id: ctx.session_id,
575 provider_id: ctx.provider_id,
576 retry_config: ctx.retry_config,
577 observer: ctx.observer,
578 mocks: ctx.mocks,
579 };
580
581 let execution = Box::pin(self.execute(sub_ctx, sub_input))
582 .await
583 .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
584 match execution.status {
585 FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
586 FlowStatus::Waiting(wait) => bail!(
587 "flow.call cannot pause (flow {} waiting {:?})",
588 flow_id_owned,
589 wait.reason
590 ),
591 }
592 }
593
594 async fn execute_component_exec(
595 &self,
596 ctx: &FlowContext<'_>,
597 node_id: &str,
598 node: &HostNode,
599 payload: Value,
600 event: &NodeEvent<'_>,
601 overrides: ComponentOverrides<'_>,
602 ) -> Result<NodeOutput> {
603 #[derive(Deserialize)]
604 struct ComponentPayload {
605 #[serde(default, alias = "component_ref", alias = "component")]
606 component: Option<String>,
607 #[serde(alias = "op")]
608 operation: Option<String>,
609 #[serde(default)]
610 input: Value,
611 #[serde(default)]
612 config: Value,
613 }
614
615 let payload: ComponentPayload =
616 serde_json::from_value(payload).context("invalid payload for component.exec")?;
617 let component_ref = overrides
618 .component
619 .map(str::to_string)
620 .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
621 .with_context(|| "component.exec requires a component_ref")?;
622 let operation = resolve_component_operation(
623 node_id,
624 node.component_id.as_str(),
625 payload.operation,
626 overrides.operation,
627 node.operation_in_mapping.as_deref(),
628 )?;
629 let call = ComponentCall {
630 component_ref,
631 operation,
632 input: payload.input,
633 config: payload.config,
634 };
635
636 self.invoke_component_call(ctx, node_id, call, event).await
637 }
638
639 async fn execute_component_call(
640 &self,
641 ctx: &FlowContext<'_>,
642 node_id: &str,
643 node: &HostNode,
644 payload: Value,
645 component_ref: &str,
646 event: &NodeEvent<'_>,
647 ) -> Result<NodeOutput> {
648 let payload_operation = extract_operation_from_mapping(&payload);
649 let (input, config) = split_operation_payload(payload);
650 let operation = resolve_component_operation(
651 node_id,
652 node.component_id.as_str(),
653 payload_operation,
654 node.operation_name.as_deref(),
655 node.operation_in_mapping.as_deref(),
656 )?;
657 let call = ComponentCall {
658 component_ref: component_ref.to_string(),
659 operation,
660 input,
661 config,
662 };
663 self.invoke_component_call(ctx, node_id, call, event).await
664 }
665
666 async fn invoke_component_call(
667 &self,
668 ctx: &FlowContext<'_>,
669 node_id: &str,
670 call: ComponentCall,
671 event: &NodeEvent<'_>,
672 ) -> Result<NodeOutput> {
673 self.validate_component(ctx, event, &call)?;
674 let input_json = serde_json::to_string(&call.input)?;
675 let config_json = if call.config.is_null() {
676 None
677 } else {
678 Some(serde_json::to_string(&call.config)?)
679 };
680
681 let key = FlowKey {
682 pack_id: ctx.pack_id.to_string(),
683 flow_id: ctx.flow_id.to_string(),
684 };
685 let pack_idx = *self.flow_sources.get(&key).with_context(|| {
686 format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
687 })?;
688 let pack = Arc::clone(&self.packs[pack_idx]);
689 let exec_ctx = component_exec_ctx(ctx, node_id);
690 let value = pack
691 .invoke_component(
692 call.component_ref.as_str(),
693 exec_ctx,
694 call.operation.as_str(),
695 config_json,
696 input_json,
697 )
698 .await?;
699
700 if let Some((code, message)) = component_error(&value) {
701 bail!(
702 "component {} failed: {}: {}",
703 call.component_ref,
704 code,
705 message
706 );
707 }
708 Ok(NodeOutput::new(value))
709 }
710
711 async fn execute_provider_invoke(
712 &self,
713 ctx: &FlowContext<'_>,
714 node_id: &str,
715 state: &ExecutionState,
716 payload: Value,
717 event: &NodeEvent<'_>,
718 ) -> Result<NodeOutput> {
719 #[derive(Deserialize)]
720 struct ProviderPayload {
721 #[serde(default)]
722 provider_id: Option<String>,
723 #[serde(default)]
724 provider_type: Option<String>,
725 #[serde(default, alias = "operation")]
726 op: Option<String>,
727 #[serde(default)]
728 input: Value,
729 #[serde(default)]
730 in_map: Value,
731 #[serde(default)]
732 out_map: Value,
733 #[serde(default)]
734 err_map: Value,
735 }
736
737 let payload: ProviderPayload =
738 serde_json::from_value(payload).context("invalid payload for provider.invoke")?;
739 let op = payload
740 .op
741 .as_deref()
742 .filter(|v| !v.trim().is_empty())
743 .with_context(|| "provider.invoke requires an op")?
744 .to_string();
745
746 let prev = state
747 .last_output
748 .as_ref()
749 .cloned()
750 .unwrap_or_else(|| Value::Object(JsonMap::new()));
751 let base_ctx = template_context(state, prev);
752
753 let input_value = if !payload.in_map.is_null() {
754 let mut ctx_value = base_ctx.clone();
755 if let Value::Object(ref mut map) = ctx_value {
756 map.insert("input".into(), payload.input.clone());
757 map.insert("result".into(), payload.input.clone());
758 }
759 render_template_value(
760 &payload.in_map,
761 &ctx_value,
762 TemplateOptions {
763 allow_pointer: true,
764 },
765 )
766 .context("failed to render provider.invoke in_map")?
767 } else if !payload.input.is_null() {
768 payload.input
769 } else {
770 Value::Null
771 };
772 let input_json = serde_json::to_vec(&input_value)?;
773
774 self.validate_tool(
775 ctx,
776 event,
777 payload.provider_id.as_deref(),
778 payload.provider_type.as_deref(),
779 &op,
780 &input_value,
781 )?;
782
783 let key = FlowKey {
784 pack_id: ctx.pack_id.to_string(),
785 flow_id: ctx.flow_id.to_string(),
786 };
787 let pack_idx = *self.flow_sources.get(&key).with_context(|| {
788 format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
789 })?;
790 let pack = Arc::clone(&self.packs[pack_idx]);
791 let binding = pack.resolve_provider(
792 payload.provider_id.as_deref(),
793 payload.provider_type.as_deref(),
794 )?;
795 let exec_ctx = component_exec_ctx(ctx, node_id);
796 let result = pack
797 .invoke_provider(&binding, exec_ctx, &op, input_json)
798 .await?;
799
800 let output = if payload.out_map.is_null() {
801 result
802 } else {
803 let mut ctx_value = base_ctx;
804 if let Value::Object(ref mut map) = ctx_value {
805 map.insert("input".into(), result.clone());
806 map.insert("result".into(), result.clone());
807 }
808 render_template_value(
809 &payload.out_map,
810 &ctx_value,
811 TemplateOptions {
812 allow_pointer: true,
813 },
814 )
815 .context("failed to render provider.invoke out_map")?
816 };
817 let _ = payload.err_map;
818 Ok(NodeOutput::new(output))
819 }
820
821 fn validate_component(
822 &self,
823 ctx: &FlowContext<'_>,
824 event: &NodeEvent<'_>,
825 call: &ComponentCall,
826 ) -> Result<()> {
827 if self.validation.mode == ValidationMode::Off {
828 return Ok(());
829 }
830 let mut metadata = JsonMap::new();
831 metadata.insert("tenant_id".to_string(), json!(ctx.tenant));
832 if let Some(id) = ctx.session_id {
833 metadata.insert("session".to_string(), json!({ "id": id }));
834 }
835 let envelope = json!({
836 "component_id": call.component_ref,
837 "operation": call.operation,
838 "input": call.input,
839 "config": call.config,
840 "metadata": Value::Object(metadata),
841 });
842 let issues = validate_component_envelope(&envelope);
843 self.report_validation(ctx, event, "component", issues)
844 }
845
846 fn validate_tool(
847 &self,
848 ctx: &FlowContext<'_>,
849 event: &NodeEvent<'_>,
850 provider_id: Option<&str>,
851 provider_type: Option<&str>,
852 operation: &str,
853 input: &Value,
854 ) -> Result<()> {
855 if self.validation.mode == ValidationMode::Off {
856 return Ok(());
857 }
858 let tool_id = provider_id.or(provider_type).unwrap_or("provider.invoke");
859 let mut metadata = JsonMap::new();
860 metadata.insert("tenant_id".to_string(), json!(ctx.tenant));
861 if let Some(id) = ctx.session_id {
862 metadata.insert("session".to_string(), json!({ "id": id }));
863 }
864 let envelope = json!({
865 "tool_id": tool_id,
866 "operation": operation,
867 "input": input,
868 "metadata": Value::Object(metadata),
869 });
870 let issues = validate_tool_envelope(&envelope);
871 self.report_validation(ctx, event, "tool", issues)
872 }
873
874 fn report_validation(
875 &self,
876 ctx: &FlowContext<'_>,
877 event: &NodeEvent<'_>,
878 kind: &str,
879 issues: Vec<ValidationIssue>,
880 ) -> Result<()> {
881 if issues.is_empty() {
882 return Ok(());
883 }
884 if let Some(observer) = ctx.observer {
885 observer.on_validation(event, &issues);
886 }
887 match self.validation.mode {
888 ValidationMode::Warn => {
889 tracing::warn!(
890 tenant = ctx.tenant,
891 flow_id = ctx.flow_id,
892 node_id = event.node_id,
893 kind,
894 issues = ?issues,
895 "invocation envelope validation issues"
896 );
897 Ok(())
898 }
899 ValidationMode::Error => {
900 tracing::error!(
901 tenant = ctx.tenant,
902 flow_id = ctx.flow_id,
903 node_id = event.node_id,
904 kind,
905 issues = ?issues,
906 "invocation envelope validation failed"
907 );
908 bail!("invocation_validation_failed");
909 }
910 ValidationMode::Off => Ok(()),
911 }
912 }
913
914 pub fn flows(&self) -> &[FlowDescriptor] {
915 &self.flows
916 }
917
918 pub fn flow_by_key(&self, pack_id: &str, flow_id: &str) -> Option<&FlowDescriptor> {
919 self.flows
920 .iter()
921 .find(|descriptor| descriptor.pack_id == pack_id && descriptor.id == flow_id)
922 }
923
924 pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
925 let mut matches = self
926 .flows
927 .iter()
928 .filter(|descriptor| descriptor.flow_type == flow_type);
929 let first = matches.next()?;
930 if matches.next().is_some() {
931 return None;
932 }
933 Some(first)
934 }
935
936 pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
937 let mut matches = self
938 .flows
939 .iter()
940 .filter(|descriptor| descriptor.id == flow_id);
941 let first = matches.next()?;
942 if matches.next().is_some() {
943 return None;
944 }
945 Some(first)
946 }
947}
948
949pub trait ExecutionObserver: Send + Sync {
950 fn on_node_start(&self, event: &NodeEvent<'_>);
951 fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
952 fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
953 fn on_validation(&self, _event: &NodeEvent<'_>, _issues: &[ValidationIssue]) {}
954}
955
956pub struct NodeEvent<'a> {
957 pub context: &'a FlowContext<'a>,
958 pub node_id: &'a str,
959 pub node: &'a HostNode,
960 pub payload: &'a Value,
961}
962
963#[derive(Clone, Debug, Serialize, Deserialize)]
964pub struct ExecutionState {
965 #[serde(default)]
966 entry: Value,
967 #[serde(default)]
968 input: Value,
969 #[serde(default)]
970 nodes: HashMap<String, NodeOutput>,
971 #[serde(default)]
972 egress: Vec<Value>,
973 #[serde(default, skip_serializing_if = "Option::is_none")]
974 last_output: Option<Value>,
975}
976
977impl ExecutionState {
978 fn new(input: Value) -> Self {
979 Self {
980 entry: input.clone(),
981 input,
982 nodes: HashMap::new(),
983 egress: Vec::new(),
984 last_output: None,
985 }
986 }
987
988 fn ensure_entry(&mut self) {
989 if self.entry.is_null() {
990 self.entry = self.input.clone();
991 }
992 }
993
994 fn context(&self) -> Value {
995 let mut nodes = JsonMap::new();
996 for (id, output) in &self.nodes {
997 nodes.insert(
998 id.clone(),
999 json!({
1000 "ok": output.ok,
1001 "payload": output.payload.clone(),
1002 "meta": output.meta.clone(),
1003 }),
1004 );
1005 }
1006 json!({
1007 "entry": self.entry.clone(),
1008 "input": self.input.clone(),
1009 "nodes": nodes,
1010 })
1011 }
1012
1013 fn outputs_map(&self) -> JsonMap<String, Value> {
1014 let mut outputs = JsonMap::new();
1015 for (id, output) in &self.nodes {
1016 outputs.insert(id.clone(), output.payload.clone());
1017 }
1018 outputs
1019 }
1020 fn push_egress(&mut self, payload: Value) {
1021 self.egress.push(payload);
1022 }
1023
1024 fn replace_input(&mut self, input: Value) {
1025 self.input = input;
1026 }
1027
1028 fn clear_egress(&mut self) {
1029 self.egress.clear();
1030 }
1031
1032 fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
1033 if self.egress.is_empty() {
1034 return final_payload.unwrap_or(Value::Null);
1035 }
1036 let mut emitted = std::mem::take(&mut self.egress);
1037 if let Some(value) = final_payload {
1038 match value {
1039 Value::Null => {}
1040 Value::Array(items) => emitted.extend(items),
1041 other => emitted.push(other),
1042 }
1043 }
1044 Value::Array(emitted)
1045 }
1046}
1047
1048#[derive(Clone, Debug, Serialize, Deserialize)]
1049struct NodeOutput {
1050 ok: bool,
1051 payload: Value,
1052 meta: Value,
1053}
1054
1055impl NodeOutput {
1056 fn new(payload: Value) -> Self {
1057 Self {
1058 ok: true,
1059 payload,
1060 meta: Value::Null,
1061 }
1062 }
1063}
1064
1065struct DispatchOutcome {
1066 output: NodeOutput,
1067 wait_reason: Option<String>,
1068}
1069
1070impl DispatchOutcome {
1071 fn complete(output: NodeOutput) -> Self {
1072 Self {
1073 output,
1074 wait_reason: None,
1075 }
1076 }
1077
1078 fn wait(output: NodeOutput, reason: Option<String>) -> Self {
1079 Self {
1080 output,
1081 wait_reason: reason,
1082 }
1083 }
1084}
1085
1086fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
1087 ComponentExecCtx {
1088 tenant: ComponentTenantCtx {
1089 tenant: ctx.tenant.to_string(),
1090 team: None,
1091 user: ctx.provider_id.map(str::to_string),
1092 trace_id: None,
1093 correlation_id: ctx.session_id.map(str::to_string),
1094 deadline_unix_ms: None,
1095 attempt: 1,
1096 idempotency_key: ctx.session_id.map(str::to_string),
1097 },
1098 flow_id: ctx.flow_id.to_string(),
1099 node_id: Some(node_id.to_string()),
1100 }
1101}
1102
1103fn component_error(value: &Value) -> Option<(String, String)> {
1104 let obj = value.as_object()?;
1105 let ok = obj.get("ok").and_then(Value::as_bool)?;
1106 if ok {
1107 return None;
1108 }
1109 let err = obj.get("error")?.as_object()?;
1110 let code = err
1111 .get("code")
1112 .and_then(Value::as_str)
1113 .unwrap_or("component_error");
1114 let message = err
1115 .get("message")
1116 .and_then(Value::as_str)
1117 .unwrap_or("component reported error");
1118 Some((code.to_string(), message.to_string()))
1119}
1120
1121fn extract_wait_reason(payload: &Value) -> Option<String> {
1122 match payload {
1123 Value::String(s) => Some(s.clone()),
1124 Value::Object(map) => map
1125 .get("reason")
1126 .and_then(Value::as_str)
1127 .map(|value| value.to_string()),
1128 _ => None,
1129 }
1130}
1131
1132fn template_context(state: &ExecutionState, prev: Value) -> Value {
1133 let entry = if state.entry.is_null() {
1134 Value::Object(JsonMap::new())
1135 } else {
1136 state.entry.clone()
1137 };
1138 let mut ctx = JsonMap::new();
1139 ctx.insert("entry".into(), entry);
1140 ctx.insert("prev".into(), prev);
1141 ctx.insert("node".into(), Value::Object(state.outputs_map()));
1142 ctx.insert("state".into(), state.context());
1143 Value::Object(ctx)
1144}
1145
1146impl From<Flow> for HostFlow {
1147 fn from(value: Flow) -> Self {
1148 let mut nodes = IndexMap::new();
1149 for (id, node) in value.nodes {
1150 nodes.insert(id.clone(), HostNode::from(node));
1151 }
1152 let start = value
1153 .entrypoints
1154 .get("default")
1155 .and_then(Value::as_str)
1156 .and_then(|id| NodeId::from_str(id).ok())
1157 .or_else(|| nodes.keys().next().cloned());
1158 Self {
1159 id: value.id.as_str().to_string(),
1160 start,
1161 nodes,
1162 }
1163 }
1164}
1165
1166impl From<Node> for HostNode {
1167 fn from(node: Node) -> Self {
1168 let component_ref = node.component.id.as_str().to_string();
1169 let raw_operation = node.component.operation.clone();
1170 let operation_in_mapping = extract_operation_from_mapping(&node.input.mapping);
1171 let operation_is_component_exec = raw_operation.as_deref() == Some("component.exec");
1172 let operation_is_emit = raw_operation
1173 .as_deref()
1174 .map(|op| op.starts_with("emit."))
1175 .unwrap_or(false);
1176 let is_component_exec = component_ref == "component.exec" || operation_is_component_exec;
1177
1178 let kind = if is_component_exec {
1179 let target = if component_ref == "component.exec" {
1180 if let Some(op) = raw_operation
1181 .as_deref()
1182 .filter(|op| op.starts_with("emit."))
1183 {
1184 op.to_string()
1185 } else {
1186 extract_target_component(&node.input.mapping)
1187 .unwrap_or_else(|| "component.exec".to_string())
1188 }
1189 } else {
1190 extract_target_component(&node.input.mapping)
1191 .unwrap_or_else(|| component_ref.clone())
1192 };
1193 if target.starts_with("emit.") {
1194 NodeKind::BuiltinEmit {
1195 kind: emit_kind_from_ref(&target),
1196 }
1197 } else {
1198 NodeKind::Exec {
1199 target_component: target,
1200 }
1201 }
1202 } else if operation_is_emit {
1203 NodeKind::BuiltinEmit {
1204 kind: emit_kind_from_ref(raw_operation.as_deref().unwrap_or("emit.log")),
1205 }
1206 } else {
1207 match component_ref.as_str() {
1208 "flow.call" => NodeKind::FlowCall,
1209 "provider.invoke" => NodeKind::ProviderInvoke,
1210 "session.wait" => NodeKind::Wait,
1211 comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
1212 kind: emit_kind_from_ref(comp),
1213 },
1214 other => NodeKind::PackComponent {
1215 component_ref: other.to_string(),
1216 },
1217 }
1218 };
1219 let component_label = match &kind {
1220 NodeKind::Exec { .. } => "component.exec".to_string(),
1221 NodeKind::PackComponent { component_ref } => component_ref.clone(),
1222 NodeKind::ProviderInvoke => "provider.invoke".to_string(),
1223 NodeKind::FlowCall => "flow.call".to_string(),
1224 NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
1225 NodeKind::Wait => "session.wait".to_string(),
1226 };
1227 let operation_name = if is_component_exec && operation_is_component_exec {
1228 None
1229 } else {
1230 raw_operation.clone()
1231 };
1232 let payload_expr = match kind {
1233 NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.input.mapping),
1234 _ => node.input.mapping.clone(),
1235 };
1236 Self {
1237 kind,
1238 component: component_label,
1239 component_id: if is_component_exec {
1240 "component.exec".to_string()
1241 } else {
1242 component_ref
1243 },
1244 operation_name,
1245 operation_in_mapping,
1246 payload_expr,
1247 routing: node.routing,
1248 }
1249 }
1250}
1251
1252fn extract_target_component(payload: &Value) -> Option<String> {
1253 match payload {
1254 Value::Object(map) => map
1255 .get("component")
1256 .or_else(|| map.get("component_ref"))
1257 .and_then(Value::as_str)
1258 .map(|s| s.to_string()),
1259 _ => None,
1260 }
1261}
1262
1263fn extract_operation_from_mapping(payload: &Value) -> Option<String> {
1264 match payload {
1265 Value::Object(map) => map
1266 .get("operation")
1267 .or_else(|| map.get("op"))
1268 .and_then(Value::as_str)
1269 .map(str::trim)
1270 .filter(|value| !value.is_empty())
1271 .map(|value| value.to_string()),
1272 _ => None,
1273 }
1274}
1275
1276fn extract_emit_payload(payload: &Value) -> Value {
1277 if let Value::Object(map) = payload {
1278 if let Some(input) = map.get("input") {
1279 return input.clone();
1280 }
1281 if let Some(inner) = map.get("payload") {
1282 return inner.clone();
1283 }
1284 }
1285 payload.clone()
1286}
1287
1288fn split_operation_payload(payload: Value) -> (Value, Value) {
1289 if let Value::Object(mut map) = payload.clone()
1290 && map.contains_key("input")
1291 {
1292 let input = map.remove("input").unwrap_or(Value::Null);
1293 let config = map.remove("config").unwrap_or(Value::Null);
1294 let legacy_only = map.keys().all(|key| {
1295 matches!(
1296 key.as_str(),
1297 "operation" | "op" | "component" | "component_ref"
1298 )
1299 });
1300 if legacy_only {
1301 return (input, config);
1302 }
1303 }
1304 (payload, Value::Null)
1305}
1306
1307fn resolve_component_operation(
1308 node_id: &str,
1309 component_label: &str,
1310 payload_operation: Option<String>,
1311 operation_override: Option<&str>,
1312 operation_in_mapping: Option<&str>,
1313) -> Result<String> {
1314 if let Some(op) = operation_override
1315 .map(str::trim)
1316 .filter(|value| !value.is_empty())
1317 {
1318 return Ok(op.to_string());
1319 }
1320
1321 if let Some(op) = payload_operation
1322 .as_deref()
1323 .map(str::trim)
1324 .filter(|value| !value.is_empty())
1325 {
1326 return Ok(op.to_string());
1327 }
1328
1329 let mut message = format!(
1330 "missing operation for node `{}` (component `{}`); expected node.component.operation to be set",
1331 node_id, component_label,
1332 );
1333 if let Some(found) = operation_in_mapping {
1334 message.push_str(&format!(
1335 ". Found operation in input.mapping (`{}`) but this is not used; pack compiler must preserve node.component.operation.",
1336 found
1337 ));
1338 }
1339 bail!(message);
1340}
1341
1342fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
1343 match component_ref {
1344 "emit.log" => EmitKind::Log,
1345 "emit.response" => EmitKind::Response,
1346 other => EmitKind::Other(other.to_string()),
1347 }
1348}
1349
1350fn emit_ref_from_kind(kind: &EmitKind) -> String {
1351 match kind {
1352 EmitKind::Log => "emit.log".to_string(),
1353 EmitKind::Response => "emit.response".to_string(),
1354 EmitKind::Other(other) => other.clone(),
1355 }
1356}
1357
1358#[cfg(test)]
1359mod tests {
1360 use super::*;
1361 use crate::validate::{ValidationConfig, ValidationMode};
1362 use greentic_types::{
1363 Flow, FlowComponentRef, FlowId, FlowKind, InputMapping, Node, NodeId, OutputMapping,
1364 Routing, TelemetryHints,
1365 };
1366 use serde_json::json;
1367 use std::collections::BTreeMap;
1368 use std::str::FromStr;
1369 use std::sync::Mutex;
1370 use tokio::runtime::Runtime;
1371
1372 fn minimal_engine() -> FlowEngine {
1373 FlowEngine {
1374 packs: Vec::new(),
1375 flows: Vec::new(),
1376 flow_sources: HashMap::new(),
1377 flow_cache: RwLock::new(HashMap::new()),
1378 default_env: "local".to_string(),
1379 validation: ValidationConfig {
1380 mode: ValidationMode::Off,
1381 },
1382 }
1383 }
1384
1385 #[test]
1386 fn templating_renders_with_partials_and_data() {
1387 let mut state = ExecutionState::new(json!({ "city": "London" }));
1388 state.nodes.insert(
1389 "forecast".to_string(),
1390 NodeOutput::new(json!({ "temp": "20C" })),
1391 );
1392
1393 let ctx = state.context();
1395 assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
1396 }
1397
1398 #[test]
1399 fn finalize_wraps_emitted_payloads() {
1400 let mut state = ExecutionState::new(json!({}));
1401 state.push_egress(json!({ "text": "first" }));
1402 state.push_egress(json!({ "text": "second" }));
1403 let result = state.finalize_with(Some(json!({ "text": "final" })));
1404 assert_eq!(
1405 result,
1406 json!([
1407 { "text": "first" },
1408 { "text": "second" },
1409 { "text": "final" }
1410 ])
1411 );
1412 }
1413
1414 #[test]
1415 fn finalize_flattens_final_array() {
1416 let mut state = ExecutionState::new(json!({}));
1417 state.push_egress(json!({ "text": "only" }));
1418 let result = state.finalize_with(Some(json!([
1419 { "text": "extra-1" },
1420 { "text": "extra-2" }
1421 ])));
1422 assert_eq!(
1423 result,
1424 json!([
1425 { "text": "only" },
1426 { "text": "extra-1" },
1427 { "text": "extra-2" }
1428 ])
1429 );
1430 }
1431
1432 #[test]
1433 fn missing_operation_reports_node_and_component() {
1434 let engine = minimal_engine();
1435 let rt = Runtime::new().unwrap();
1436 let retry_config = RetryConfig {
1437 max_attempts: 1,
1438 base_delay_ms: 1,
1439 };
1440 let ctx = FlowContext {
1441 tenant: "tenant",
1442 pack_id: "test-pack",
1443 flow_id: "flow",
1444 node_id: Some("missing-op"),
1445 tool: None,
1446 action: None,
1447 session_id: None,
1448 provider_id: None,
1449 retry_config,
1450 observer: None,
1451 mocks: None,
1452 };
1453 let node = HostNode {
1454 kind: NodeKind::Exec {
1455 target_component: "qa.process".into(),
1456 },
1457 component: "component.exec".into(),
1458 component_id: "component.exec".into(),
1459 operation_name: None,
1460 operation_in_mapping: None,
1461 payload_expr: Value::Null,
1462 routing: Routing::End,
1463 };
1464 let _state = ExecutionState::new(Value::Null);
1465 let payload = json!({ "component": "qa.process" });
1466 let event = NodeEvent {
1467 context: &ctx,
1468 node_id: "missing-op",
1469 node: &node,
1470 payload: &payload,
1471 };
1472 let err = rt
1473 .block_on(engine.execute_component_exec(
1474 &ctx,
1475 "missing-op",
1476 &node,
1477 payload.clone(),
1478 &event,
1479 ComponentOverrides {
1480 component: None,
1481 operation: None,
1482 },
1483 ))
1484 .unwrap_err();
1485 let message = err.to_string();
1486 assert!(
1487 message.contains("missing operation for node `missing-op`"),
1488 "unexpected message: {message}"
1489 );
1490 assert!(
1491 message.contains("(component `component.exec`)"),
1492 "unexpected message: {message}"
1493 );
1494 }
1495
1496 #[test]
1497 fn missing_operation_mentions_mapping_hint() {
1498 let engine = minimal_engine();
1499 let rt = Runtime::new().unwrap();
1500 let retry_config = RetryConfig {
1501 max_attempts: 1,
1502 base_delay_ms: 1,
1503 };
1504 let ctx = FlowContext {
1505 tenant: "tenant",
1506 pack_id: "test-pack",
1507 flow_id: "flow",
1508 node_id: Some("missing-op-hint"),
1509 tool: None,
1510 action: None,
1511 session_id: None,
1512 provider_id: None,
1513 retry_config,
1514 observer: None,
1515 mocks: None,
1516 };
1517 let node = HostNode {
1518 kind: NodeKind::Exec {
1519 target_component: "qa.process".into(),
1520 },
1521 component: "component.exec".into(),
1522 component_id: "component.exec".into(),
1523 operation_name: None,
1524 operation_in_mapping: Some("render".into()),
1525 payload_expr: Value::Null,
1526 routing: Routing::End,
1527 };
1528 let _state = ExecutionState::new(Value::Null);
1529 let payload = json!({ "component": "qa.process" });
1530 let event = NodeEvent {
1531 context: &ctx,
1532 node_id: "missing-op-hint",
1533 node: &node,
1534 payload: &payload,
1535 };
1536 let err = rt
1537 .block_on(engine.execute_component_exec(
1538 &ctx,
1539 "missing-op-hint",
1540 &node,
1541 payload.clone(),
1542 &event,
1543 ComponentOverrides {
1544 component: None,
1545 operation: None,
1546 },
1547 ))
1548 .unwrap_err();
1549 let message = err.to_string();
1550 assert!(
1551 message.contains("missing operation for node `missing-op-hint`"),
1552 "unexpected message: {message}"
1553 );
1554 assert!(
1555 message.contains("Found operation in input.mapping (`render`)"),
1556 "unexpected message: {message}"
1557 );
1558 }
1559
1560 struct CountingObserver {
1561 starts: Mutex<Vec<String>>,
1562 ends: Mutex<Vec<Value>>,
1563 }
1564
1565 impl CountingObserver {
1566 fn new() -> Self {
1567 Self {
1568 starts: Mutex::new(Vec::new()),
1569 ends: Mutex::new(Vec::new()),
1570 }
1571 }
1572 }
1573
1574 impl ExecutionObserver for CountingObserver {
1575 fn on_node_start(&self, event: &NodeEvent<'_>) {
1576 self.starts.lock().unwrap().push(event.node_id.to_string());
1577 }
1578
1579 fn on_node_end(&self, _event: &NodeEvent<'_>, output: &Value) {
1580 self.ends.lock().unwrap().push(output.clone());
1581 }
1582
1583 fn on_node_error(&self, _event: &NodeEvent<'_>, _error: &dyn StdError) {}
1584 }
1585
1586 #[test]
1587 fn emits_end_event_for_successful_node() {
1588 let node_id = NodeId::from_str("emit").unwrap();
1589 let node = Node {
1590 id: node_id.clone(),
1591 component: FlowComponentRef {
1592 id: "emit.log".parse().unwrap(),
1593 pack_alias: None,
1594 operation: None,
1595 },
1596 input: InputMapping {
1597 mapping: json!({ "message": "logged" }),
1598 },
1599 output: OutputMapping {
1600 mapping: Value::Null,
1601 },
1602 routing: Routing::End,
1603 telemetry: TelemetryHints::default(),
1604 };
1605 let mut nodes = indexmap::IndexMap::default();
1606 nodes.insert(node_id.clone(), node);
1607 let flow = Flow {
1608 schema_version: "1.0".into(),
1609 id: FlowId::from_str("emit.flow").unwrap(),
1610 kind: FlowKind::Messaging,
1611 entrypoints: BTreeMap::from([(
1612 "default".to_string(),
1613 Value::String(node_id.to_string()),
1614 )]),
1615 nodes,
1616 metadata: Default::default(),
1617 };
1618 let host_flow = HostFlow::from(flow);
1619
1620 let engine = FlowEngine {
1621 packs: Vec::new(),
1622 flows: Vec::new(),
1623 flow_sources: HashMap::new(),
1624 flow_cache: RwLock::new(HashMap::from([(
1625 FlowKey {
1626 pack_id: "test-pack".to_string(),
1627 flow_id: "emit.flow".to_string(),
1628 },
1629 host_flow,
1630 )])),
1631 default_env: "local".to_string(),
1632 validation: ValidationConfig {
1633 mode: ValidationMode::Off,
1634 },
1635 };
1636 let observer = CountingObserver::new();
1637 let ctx = FlowContext {
1638 tenant: "demo",
1639 pack_id: "test-pack",
1640 flow_id: "emit.flow",
1641 node_id: None,
1642 tool: None,
1643 action: None,
1644 session_id: None,
1645 provider_id: None,
1646 retry_config: RetryConfig {
1647 max_attempts: 1,
1648 base_delay_ms: 1,
1649 },
1650 observer: Some(&observer),
1651 mocks: None,
1652 };
1653
1654 let rt = Runtime::new().unwrap();
1655 let result = rt.block_on(engine.execute(ctx, Value::Null)).unwrap();
1656 assert!(matches!(result.status, FlowStatus::Completed));
1657
1658 let starts = observer.starts.lock().unwrap();
1659 let ends = observer.ends.lock().unwrap();
1660 assert_eq!(starts.len(), 1);
1661 assert_eq!(ends.len(), 1);
1662 assert_eq!(ends[0], json!({ "message": "logged" }));
1663 }
1664}
1665
1666use tracing::Instrument;
1667
1668pub struct FlowContext<'a> {
1669 pub tenant: &'a str,
1670 pub pack_id: &'a str,
1671 pub flow_id: &'a str,
1672 pub node_id: Option<&'a str>,
1673 pub tool: Option<&'a str>,
1674 pub action: Option<&'a str>,
1675 pub session_id: Option<&'a str>,
1676 pub provider_id: Option<&'a str>,
1677 pub retry_config: RetryConfig,
1678 pub observer: Option<&'a dyn ExecutionObserver>,
1679 pub mocks: Option<&'a MockLayer>,
1680}
1681
1682#[derive(Copy, Clone)]
1683pub struct RetryConfig {
1684 pub max_attempts: u32,
1685 pub base_delay_ms: u64,
1686}
1687
1688fn should_retry(err: &anyhow::Error) -> bool {
1689 let lower = err.to_string().to_lowercase();
1690 lower.contains("transient") || lower.contains("unavailable") || lower.contains("internal")
1691}
1692
1693impl From<FlowRetryConfig> for RetryConfig {
1694 fn from(value: FlowRetryConfig) -> Self {
1695 Self {
1696 max_attempts: value.max_attempts.max(1),
1697 base_delay_ms: value.base_delay_ms.max(50),
1698 }
1699 }
1700}