1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
9use anyhow::{Context, Result, anyhow, bail};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use super::templating::{TemplateOptions, render_template_value};
18use crate::config::{FlowRetryConfig, HostConfig};
19use crate::pack::{FlowDescriptor, PackRuntime};
20use crate::runner::invocation::{InvocationMeta, build_invocation_envelope};
21use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
22#[cfg(feature = "fault-injection")]
23use crate::testing::fault_injection::{FaultContext, FaultPoint, maybe_fail};
24use crate::validate::{
25 ValidationConfig, ValidationIssue, ValidationMode, validate_component_envelope,
26 validate_tool_envelope,
27};
28use greentic_types::{Flow, Node, NodeId, Routing};
29
30pub struct FlowEngine {
31 packs: Vec<Arc<PackRuntime>>,
32 flows: Vec<FlowDescriptor>,
33 flow_sources: HashMap<FlowKey, usize>,
34 flow_cache: RwLock<HashMap<FlowKey, HostFlow>>,
35 default_env: String,
36 validation: ValidationConfig,
37}
38
39#[derive(Clone, Debug, PartialEq, Eq, Hash)]
40struct FlowKey {
41 pack_id: String,
42 flow_id: String,
43}
44
45#[derive(Clone, Debug, Serialize, Deserialize)]
46pub struct FlowSnapshot {
47 pub pack_id: String,
48 pub flow_id: String,
49 #[serde(default, skip_serializing_if = "Option::is_none")]
50 pub next_flow: Option<String>,
51 pub next_node: String,
52 pub state: ExecutionState,
53}
54
55#[derive(Clone, Debug)]
56pub struct FlowWait {
57 pub reason: Option<String>,
58 pub snapshot: FlowSnapshot,
59}
60
61#[derive(Clone, Debug)]
62pub enum FlowStatus {
63 Completed,
64 Waiting(Box<FlowWait>),
65}
66
67#[derive(Clone, Debug)]
68pub struct FlowExecution {
69 pub output: Value,
70 pub status: FlowStatus,
71}
72
73#[derive(Clone, Debug)]
74struct HostFlow {
75 id: String,
76 start: Option<NodeId>,
77 nodes: IndexMap<NodeId, HostNode>,
78}
79
80#[derive(Clone, Debug)]
81pub struct HostNode {
82 kind: NodeKind,
83 pub component: String,
85 component_id: String,
86 operation_name: Option<String>,
87 operation_in_mapping: Option<String>,
88 payload_expr: Value,
89 routing: Routing,
90}
91
92impl HostNode {
93 pub fn component_id(&self) -> &str {
94 &self.component_id
95 }
96
97 pub fn operation_name(&self) -> Option<&str> {
98 self.operation_name.as_deref()
99 }
100
101 pub fn operation_in_mapping(&self) -> Option<&str> {
102 self.operation_in_mapping.as_deref()
103 }
104}
105
106#[derive(Clone, Debug)]
107enum NodeKind {
108 Exec { target_component: String },
109 PackComponent { component_ref: String },
110 ProviderInvoke,
111 FlowCall,
112 BuiltinEmit { kind: EmitKind },
113 Wait,
114}
115
116#[derive(Clone, Debug)]
117enum EmitKind {
118 Log,
119 Response,
120 Other(String),
121}
122
123struct ComponentOverrides<'a> {
124 component: Option<&'a str>,
125 operation: Option<&'a str>,
126}
127
128struct ComponentCall {
129 component_ref: String,
130 operation: String,
131 input: Value,
132 config: Value,
133}
134
135impl FlowExecution {
136 fn completed(output: Value) -> Self {
137 Self {
138 output,
139 status: FlowStatus::Completed,
140 }
141 }
142
143 fn waiting(output: Value, wait: FlowWait) -> Self {
144 Self {
145 output,
146 status: FlowStatus::Waiting(Box::new(wait)),
147 }
148 }
149}
150
151impl FlowEngine {
152 pub async fn new(packs: Vec<Arc<PackRuntime>>, config: Arc<HostConfig>) -> Result<Self> {
153 let mut flow_sources: HashMap<FlowKey, usize> = HashMap::new();
154 let mut descriptors = Vec::new();
155 let mut bindings = HashMap::new();
156 for pack in &config.pack_bindings {
157 bindings.insert(pack.pack_id.clone(), pack.flows.clone());
158 }
159 let enforce_bindings = !bindings.is_empty();
160 for (idx, pack) in packs.iter().enumerate() {
161 let pack_id = pack.metadata().pack_id.clone();
162 if enforce_bindings && !bindings.contains_key(&pack_id) {
163 bail!("no gtbind entries found for pack {}", pack_id);
164 }
165 let flows = pack.list_flows().await?;
166 let allowed = bindings.get(&pack_id).map(|flows| {
167 flows
168 .iter()
169 .cloned()
170 .collect::<std::collections::HashSet<_>>()
171 });
172 let mut seen = std::collections::HashSet::new();
173 for flow in flows {
174 if let Some(ref allow) = allowed
175 && !allow.contains(&flow.id)
176 {
177 continue;
178 }
179 seen.insert(flow.id.clone());
180 tracing::info!(
181 flow_id = %flow.id,
182 flow_type = %flow.flow_type,
183 pack_id = %flow.pack_id,
184 pack_index = idx,
185 "registered flow"
186 );
187 flow_sources.insert(
188 FlowKey {
189 pack_id: flow.pack_id.clone(),
190 flow_id: flow.id.clone(),
191 },
192 idx,
193 );
194 descriptors.retain(|existing: &FlowDescriptor| {
195 !(existing.id == flow.id && existing.pack_id == flow.pack_id)
196 });
197 descriptors.push(flow);
198 }
199 if let Some(allow) = allowed {
200 let missing = allow.difference(&seen).cloned().collect::<Vec<_>>();
201 if !missing.is_empty() {
202 bail!(
203 "gtbind flow ids missing in pack {}: {}",
204 pack_id,
205 missing.join(", ")
206 );
207 }
208 }
209 }
210
211 let mut flow_map = HashMap::new();
212 for flow in &descriptors {
213 let pack_id = flow.pack_id.clone();
214 if let Some(&pack_idx) = flow_sources.get(&FlowKey {
215 pack_id: pack_id.clone(),
216 flow_id: flow.id.clone(),
217 }) {
218 let pack_clone = Arc::clone(&packs[pack_idx]);
219 let flow_id = flow.id.clone();
220 let task_flow_id = flow_id.clone();
221 match task::spawn_blocking(move || pack_clone.load_flow(&task_flow_id)).await {
222 Ok(Ok(loaded_flow)) => {
223 flow_map.insert(
224 FlowKey {
225 pack_id: pack_id.clone(),
226 flow_id,
227 },
228 HostFlow::from(loaded_flow),
229 );
230 }
231 Ok(Err(err)) => {
232 tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
233 }
234 Err(err) => {
235 tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
236 }
237 }
238 }
239 }
240
241 Ok(Self {
242 packs,
243 flows: descriptors,
244 flow_sources,
245 flow_cache: RwLock::new(flow_map),
246 default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
247 validation: config.validation.clone(),
248 })
249 }
250
251 async fn get_or_load_flow(&self, pack_id: &str, flow_id: &str) -> Result<HostFlow> {
252 let key = FlowKey {
253 pack_id: pack_id.to_string(),
254 flow_id: flow_id.to_string(),
255 };
256 if let Some(flow) = self.flow_cache.read().get(&key).cloned() {
257 return Ok(flow);
258 }
259
260 let pack_idx = *self
261 .flow_sources
262 .get(&key)
263 .with_context(|| format!("flow {pack_id}:{flow_id} not registered"))?;
264 let pack = Arc::clone(&self.packs[pack_idx]);
265 let flow_id_owned = flow_id.to_string();
266 let task_flow_id = flow_id_owned.clone();
267 let flow = task::spawn_blocking(move || pack.load_flow(&task_flow_id))
268 .await
269 .context("failed to join flow metadata task")??;
270 let host_flow = HostFlow::from(flow);
271 self.flow_cache.write().insert(
272 FlowKey {
273 pack_id: pack_id.to_string(),
274 flow_id: flow_id_owned.clone(),
275 },
276 host_flow.clone(),
277 );
278 Ok(host_flow)
279 }
280
281 pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
282 let span = tracing::info_span!(
283 "flow.execute",
284 tenant = tracing::field::Empty,
285 flow_id = tracing::field::Empty,
286 node_id = tracing::field::Empty,
287 tool = tracing::field::Empty,
288 action = tracing::field::Empty
289 );
290 annotate_span(
291 &span,
292 &FlowSpanAttributes {
293 tenant: ctx.tenant,
294 flow_id: ctx.flow_id,
295 node_id: ctx.node_id,
296 tool: ctx.tool,
297 action: ctx.action,
298 },
299 );
300 set_flow_context(
301 &self.default_env,
302 ctx.tenant,
303 ctx.flow_id,
304 ctx.node_id,
305 ctx.provider_id,
306 ctx.session_id,
307 );
308 let retry_config = ctx.retry_config;
309 let original_input = input;
310 let mut ctx = ctx;
311 async move {
312 let mut attempt = 0u32;
313 loop {
314 attempt += 1;
315 ctx.attempt = attempt;
316 #[cfg(feature = "fault-injection")]
317 {
318 let fault_ctx = FaultContext {
319 pack_id: ctx.pack_id,
320 flow_id: ctx.flow_id,
321 node_id: ctx.node_id,
322 attempt: ctx.attempt,
323 };
324 maybe_fail(FaultPoint::Timeout, fault_ctx)
325 .map_err(|err| anyhow!(err.to_string()))?;
326 }
327 match self.execute_once(&ctx, original_input.clone()).await {
328 Ok(value) => return Ok(value),
329 Err(err) => {
330 if attempt >= retry_config.max_attempts || !should_retry(&err) {
331 return Err(err);
332 }
333 let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
334 tracing::warn!(
335 tenant = ctx.tenant,
336 flow_id = ctx.flow_id,
337 attempt,
338 max_attempts = retry_config.max_attempts,
339 delay_ms = delay,
340 error = %err,
341 "transient flow execution failure, backing off"
342 );
343 tokio::time::sleep(Duration::from_millis(delay)).await;
344 }
345 }
346 }
347 }
348 .instrument(span)
349 .await
350 }
351
352 pub async fn resume(
353 &self,
354 ctx: FlowContext<'_>,
355 snapshot: FlowSnapshot,
356 input: Value,
357 ) -> Result<FlowExecution> {
358 if snapshot.pack_id != ctx.pack_id {
359 bail!(
360 "snapshot pack {} does not match requested {}",
361 snapshot.pack_id,
362 ctx.pack_id
363 );
364 }
365 let resume_flow = snapshot
366 .next_flow
367 .clone()
368 .unwrap_or_else(|| snapshot.flow_id.clone());
369 let flow_ir = self.get_or_load_flow(ctx.pack_id, &resume_flow).await?;
370 let mut state = snapshot.state;
371 state.replace_input(input);
372 state.ensure_entry();
373 self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node), resume_flow)
374 .await
375 }
376
377 async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
378 let flow_ir = self.get_or_load_flow(ctx.pack_id, ctx.flow_id).await?;
379 let state = ExecutionState::new(input);
380 self.drive_flow(ctx, flow_ir, state, None, ctx.flow_id.to_string())
381 .await
382 }
383
384 async fn drive_flow(
385 &self,
386 ctx: &FlowContext<'_>,
387 mut flow_ir: HostFlow,
388 mut state: ExecutionState,
389 resume_from: Option<String>,
390 mut current_flow_id: String,
391 ) -> Result<FlowExecution> {
392 let mut current = match resume_from {
393 Some(node) => NodeId::from_str(&node)
394 .with_context(|| format!("invalid resume node id `{node}`"))?,
395 None => flow_ir
396 .start
397 .clone()
398 .or_else(|| flow_ir.nodes.keys().next().cloned())
399 .with_context(|| format!("flow {} has no start node", flow_ir.id))?,
400 };
401
402 loop {
403 let step_ctx = FlowContext {
404 tenant: ctx.tenant,
405 pack_id: ctx.pack_id,
406 flow_id: current_flow_id.as_str(),
407 node_id: ctx.node_id,
408 tool: ctx.tool,
409 action: ctx.action,
410 session_id: ctx.session_id,
411 provider_id: ctx.provider_id,
412 retry_config: ctx.retry_config,
413 attempt: ctx.attempt,
414 observer: ctx.observer,
415 mocks: ctx.mocks,
416 };
417 let node = flow_ir
418 .nodes
419 .get(¤t)
420 .with_context(|| format!("node {} not found", current.as_str()))?;
421
422 let payload_template = node.payload_expr.clone();
423 let prev = state
424 .last_output
425 .as_ref()
426 .cloned()
427 .unwrap_or_else(|| Value::Object(JsonMap::new()));
428 let ctx_value = template_context(&state, prev);
429 #[cfg(feature = "fault-injection")]
430 {
431 let fault_ctx = FaultContext {
432 pack_id: ctx.pack_id,
433 flow_id: ctx.flow_id,
434 node_id: Some(current.as_str()),
435 attempt: ctx.attempt,
436 };
437 maybe_fail(FaultPoint::TemplateRender, fault_ctx)
438 .map_err(|err| anyhow!(err.to_string()))?;
439 }
440 let payload =
441 render_template_value(&payload_template, &ctx_value, TemplateOptions::default())
442 .context("failed to render node input template")?;
443 let observed_payload = payload.clone();
444 let node_id = current.clone();
445 let event = NodeEvent {
446 context: &step_ctx,
447 node_id: node_id.as_str(),
448 node,
449 payload: &observed_payload,
450 };
451 if let Some(observer) = step_ctx.observer {
452 observer.on_node_start(&event);
453 }
454 let dispatch = self
455 .dispatch_node(
456 &step_ctx,
457 node_id.as_str(),
458 node,
459 &mut state,
460 payload,
461 &event,
462 )
463 .await;
464 let DispatchOutcome { output, control } = match dispatch {
465 Ok(outcome) => outcome,
466 Err(err) => {
467 if let Some(observer) = step_ctx.observer {
468 observer.on_node_error(&event, err.as_ref());
469 }
470 return Err(err);
471 }
472 };
473
474 state.nodes.insert(node_id.clone().into(), output.clone());
475 state.last_output = Some(output.payload.clone());
476 if let Some(observer) = step_ctx.observer {
477 observer.on_node_end(&event, &output.payload);
478 }
479
480 match control {
481 NodeControl::Continue => {
482 let (next, should_exit) = match &node.routing {
483 Routing::Next { node_id } => (Some(node_id.clone()), false),
484 Routing::End | Routing::Reply => (None, true),
485 Routing::Branch { default, .. } => (default.clone(), default.is_none()),
486 Routing::Custom(raw) => {
487 tracing::warn!(
488 flow_id = %flow_ir.id,
489 node_id = %node_id,
490 routing = ?raw,
491 "unsupported routing; terminating flow"
492 );
493 (None, true)
494 }
495 };
496
497 if should_exit {
498 return Ok(FlowExecution::completed(
499 state.finalize_with(Some(output.payload.clone())),
500 ));
501 }
502
503 match next {
504 Some(n) => current = n,
505 None => {
506 return Ok(FlowExecution::completed(
507 state.finalize_with(Some(output.payload.clone())),
508 ));
509 }
510 }
511 }
512 NodeControl::Wait { reason } => {
513 let (next, _) = match &node.routing {
514 Routing::Next { node_id } => (Some(node_id.clone()), false),
515 Routing::End | Routing::Reply => (None, true),
516 Routing::Branch { default, .. } => (default.clone(), default.is_none()),
517 Routing::Custom(raw) => {
518 tracing::warn!(
519 flow_id = %flow_ir.id,
520 node_id = %node_id,
521 routing = ?raw,
522 "unsupported routing for wait; terminating flow"
523 );
524 (None, true)
525 }
526 };
527 let resume_target = next.ok_or_else(|| {
528 anyhow!(
529 "session.wait node {} requires a non-empty route",
530 current.as_str()
531 )
532 })?;
533 let mut snapshot_state = state.clone();
534 snapshot_state.clear_egress();
535 let snapshot = FlowSnapshot {
536 pack_id: step_ctx.pack_id.to_string(),
537 flow_id: step_ctx.flow_id.to_string(),
538 next_flow: (current_flow_id != step_ctx.flow_id)
539 .then_some(current_flow_id.clone()),
540 next_node: resume_target.as_str().to_string(),
541 state: snapshot_state,
542 };
543 let output_value = state.clone().finalize_with(None);
544 return Ok(FlowExecution::waiting(
545 output_value,
546 FlowWait { reason, snapshot },
547 ));
548 }
549 NodeControl::Jump(jump) => {
550 let jump_target = self.apply_jump(&step_ctx, &mut state, jump).await?;
551 flow_ir = jump_target.flow;
552 current_flow_id = jump_target.flow_id;
553 current = jump_target.node_id;
554 }
555 NodeControl::Respond {
556 text,
557 card_cbor,
558 needs_user,
559 } => {
560 let response = json!({
561 "text": text,
562 "card_cbor": card_cbor,
563 "needs_user": needs_user,
564 });
565 state.push_egress(response);
566 return Ok(FlowExecution::completed(state.finalize_with(None)));
567 }
568 }
569 }
570 }
571
572 async fn dispatch_node(
573 &self,
574 ctx: &FlowContext<'_>,
575 node_id: &str,
576 node: &HostNode,
577 state: &mut ExecutionState,
578 payload: Value,
579 event: &NodeEvent<'_>,
580 ) -> Result<DispatchOutcome> {
581 match &node.kind {
582 NodeKind::Exec { target_component } => self
583 .execute_component_exec(
584 ctx,
585 node_id,
586 node,
587 payload,
588 event,
589 ComponentOverrides {
590 component: Some(target_component.as_str()),
591 operation: node.operation_name.as_deref(),
592 },
593 )
594 .await
595 .and_then(component_dispatch_outcome),
596 NodeKind::PackComponent { component_ref } => self
597 .execute_component_call(ctx, node_id, node, payload, component_ref.as_str(), event)
598 .await
599 .and_then(component_dispatch_outcome),
600 NodeKind::FlowCall => self
601 .execute_flow_call(ctx, payload)
602 .await
603 .map(DispatchOutcome::complete),
604 NodeKind::ProviderInvoke => self
605 .execute_provider_invoke(ctx, node_id, state, payload, event)
606 .await
607 .map(DispatchOutcome::complete),
608 NodeKind::BuiltinEmit { kind } => {
609 match kind {
610 EmitKind::Log | EmitKind::Response => {}
611 EmitKind::Other(component) => {
612 tracing::debug!(%component, "handling emit.* as builtin");
613 }
614 }
615 state.push_egress(payload.clone());
616 Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
617 }
618 NodeKind::Wait => {
619 let reason = extract_wait_reason(&payload);
620 Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
621 }
622 }
623 }
624
625 async fn apply_jump(
626 &self,
627 ctx: &FlowContext<'_>,
628 state: &mut ExecutionState,
629 jump: JumpControl,
630 ) -> Result<JumpTarget> {
631 let target_flow = jump.flow.trim();
632 if target_flow.is_empty() {
633 bail!("missing_flow");
634 }
635
636 let flow = self
637 .get_or_load_flow(ctx.pack_id, target_flow)
638 .await
639 .with_context(|| format!("unknown_flow:{target_flow}"))?;
640
641 let target_node = if let Some(node) = jump.node.as_deref() {
642 let parsed = NodeId::from_str(node).with_context(|| format!("unknown_node:{node}"))?;
643 if !flow.nodes.contains_key(&parsed) {
644 bail!("unknown_node:{node}");
645 }
646 parsed
647 } else {
648 flow.start
649 .clone()
650 .or_else(|| flow.nodes.keys().next().cloned())
651 .ok_or_else(|| anyhow!("jump_failed: flow {target_flow} has no start node"))?
652 };
653
654 let max_redirects = jump.max_redirects.unwrap_or(3);
655 if state.redirect_count() >= max_redirects {
656 bail!("redirect_limit");
657 }
658 state.increment_redirect_count();
659 state.replace_input(jump.payload.clone());
660 state.last_output = Some(jump.payload);
661 tracing::info!(
662 flow_id = %ctx.flow_id,
663 target_flow = %target_flow,
664 target_node = %target_node.as_str(),
665 reason = ?jump.reason,
666 redirects = state.redirect_count(),
667 "flow.jump.applied"
668 );
669
670 Ok(JumpTarget {
671 flow_id: target_flow.to_string(),
672 flow,
673 node_id: target_node,
674 })
675 }
676
677 async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
678 #[derive(Deserialize)]
679 struct FlowCallPayload {
680 #[serde(alias = "flow")]
681 flow_id: String,
682 #[serde(default)]
683 input: Value,
684 }
685
686 let call: FlowCallPayload =
687 serde_json::from_value(payload).context("invalid payload for flow.call node")?;
688 if call.flow_id.trim().is_empty() {
689 bail!("flow.call requires a non-empty flow_id");
690 }
691
692 let sub_input = if call.input.is_null() {
693 Value::Null
694 } else {
695 call.input
696 };
697
698 let flow_id_owned = call.flow_id;
699 let action = "flow.call";
700 let sub_ctx = FlowContext {
701 tenant: ctx.tenant,
702 pack_id: ctx.pack_id,
703 flow_id: flow_id_owned.as_str(),
704 node_id: None,
705 tool: ctx.tool,
706 action: Some(action),
707 session_id: ctx.session_id,
708 provider_id: ctx.provider_id,
709 retry_config: ctx.retry_config,
710 attempt: ctx.attempt,
711 observer: ctx.observer,
712 mocks: ctx.mocks,
713 };
714
715 let execution = Box::pin(self.execute(sub_ctx, sub_input))
716 .await
717 .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
718 match execution.status {
719 FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
720 FlowStatus::Waiting(wait) => bail!(
721 "flow.call cannot pause (flow {} waiting {:?})",
722 flow_id_owned,
723 wait.reason
724 ),
725 }
726 }
727
728 async fn execute_component_exec(
729 &self,
730 ctx: &FlowContext<'_>,
731 node_id: &str,
732 node: &HostNode,
733 payload: Value,
734 event: &NodeEvent<'_>,
735 overrides: ComponentOverrides<'_>,
736 ) -> Result<NodeOutput> {
737 #[derive(Deserialize)]
738 struct ComponentPayload {
739 #[serde(default, alias = "component_ref", alias = "component")]
740 component: Option<String>,
741 #[serde(alias = "op")]
742 operation: Option<String>,
743 #[serde(default)]
744 input: Value,
745 #[serde(default)]
746 config: Value,
747 }
748
749 let payload: ComponentPayload =
750 serde_json::from_value(payload).context("invalid payload for component.exec")?;
751 let component_ref = overrides
752 .component
753 .map(str::to_string)
754 .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
755 .with_context(|| "component.exec requires a component_ref")?;
756 let operation = resolve_component_operation(
757 node_id,
758 node.component_id.as_str(),
759 payload.operation,
760 overrides.operation,
761 node.operation_in_mapping.as_deref(),
762 )?;
763 let call = ComponentCall {
764 component_ref,
765 operation,
766 input: payload.input,
767 config: payload.config,
768 };
769
770 self.invoke_component_call(ctx, node_id, call, event).await
771 }
772
773 async fn execute_component_call(
774 &self,
775 ctx: &FlowContext<'_>,
776 node_id: &str,
777 node: &HostNode,
778 payload: Value,
779 component_ref: &str,
780 event: &NodeEvent<'_>,
781 ) -> Result<NodeOutput> {
782 let payload_operation = extract_operation_from_mapping(&payload);
783 let (input, config) = split_operation_payload(payload);
784 let operation = resolve_component_operation(
785 node_id,
786 node.component_id.as_str(),
787 payload_operation,
788 node.operation_name.as_deref(),
789 node.operation_in_mapping.as_deref(),
790 )?;
791 let call = ComponentCall {
792 component_ref: component_ref.to_string(),
793 operation,
794 input,
795 config,
796 };
797 self.invoke_component_call(ctx, node_id, call, event).await
798 }
799
800 async fn invoke_component_call(
801 &self,
802 ctx: &FlowContext<'_>,
803 node_id: &str,
804 mut call: ComponentCall,
805 event: &NodeEvent<'_>,
806 ) -> Result<NodeOutput> {
807 self.validate_component(ctx, event, &call)?;
808 let key = FlowKey {
809 pack_id: ctx.pack_id.to_string(),
810 flow_id: ctx.flow_id.to_string(),
811 };
812 let pack_idx = *self.flow_sources.get(&key).with_context(|| {
813 format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
814 })?;
815 let pack = Arc::clone(&self.packs[pack_idx]);
816
817 resolve_card_assets(&mut call.input, &pack);
821
822 let is_card = is_card_invocation(&call.input);
830
831 let input_json = if is_card {
832 serde_json::to_string(&call.input)?
833 } else {
834 let meta = InvocationMeta {
836 env: &self.default_env,
837 tenant: ctx.tenant,
838 flow_id: ctx.flow_id,
839 node_id: Some(node_id),
840 provider_id: ctx.provider_id,
841 session_id: ctx.session_id,
842 attempt: ctx.attempt,
843 };
844 let invocation_envelope =
845 build_invocation_envelope(meta, call.operation.as_str(), call.input)
846 .context("build invocation envelope for component call")?;
847 serde_json::to_string(&invocation_envelope)?
848 };
849 let config_json = if call.config.is_null() {
850 None
851 } else {
852 Some(serde_json::to_string(&call.config)?)
853 };
854
855 let exec_ctx = component_exec_ctx(ctx, node_id);
856 #[cfg(feature = "fault-injection")]
857 {
858 let fault_ctx = FaultContext {
859 pack_id: ctx.pack_id,
860 flow_id: ctx.flow_id,
861 node_id: Some(node_id),
862 attempt: ctx.attempt,
863 };
864 maybe_fail(FaultPoint::BeforeComponentCall, fault_ctx)
865 .map_err(|err| anyhow!(err.to_string()))?;
866 }
867 let value = pack
868 .invoke_component(
869 call.component_ref.as_str(),
870 exec_ctx,
871 call.operation.as_str(),
872 config_json,
873 input_json,
874 )
875 .await?;
876 #[cfg(feature = "fault-injection")]
877 {
878 let fault_ctx = FaultContext {
879 pack_id: ctx.pack_id,
880 flow_id: ctx.flow_id,
881 node_id: Some(node_id),
882 attempt: ctx.attempt,
883 };
884 maybe_fail(FaultPoint::AfterComponentCall, fault_ctx)
885 .map_err(|err| anyhow!(err.to_string()))?;
886 }
887
888 if let Some((code, message)) = component_error(&value) {
889 bail!(
890 "component {} failed: {}: {}",
891 call.component_ref,
892 code,
893 message
894 );
895 }
896 Ok(NodeOutput::new(value))
897 }
898
899 async fn execute_provider_invoke(
900 &self,
901 ctx: &FlowContext<'_>,
902 node_id: &str,
903 state: &ExecutionState,
904 payload: Value,
905 event: &NodeEvent<'_>,
906 ) -> Result<NodeOutput> {
907 #[derive(Deserialize)]
908 struct ProviderPayload {
909 #[serde(default)]
910 provider_id: Option<String>,
911 #[serde(default)]
912 provider_type: Option<String>,
913 #[serde(default, alias = "operation")]
914 op: Option<String>,
915 #[serde(default)]
916 input: Value,
917 #[serde(default)]
918 in_map: Value,
919 #[serde(default)]
920 out_map: Value,
921 #[serde(default)]
922 err_map: Value,
923 }
924
925 let payload: ProviderPayload =
926 serde_json::from_value(payload).context("invalid payload for provider.invoke")?;
927 let op = payload
928 .op
929 .as_deref()
930 .filter(|v| !v.trim().is_empty())
931 .with_context(|| "provider.invoke requires an op")?
932 .to_string();
933
934 let prev = state
935 .last_output
936 .as_ref()
937 .cloned()
938 .unwrap_or_else(|| Value::Object(JsonMap::new()));
939 let base_ctx = template_context(state, prev);
940
941 let input_value = if !payload.in_map.is_null() {
942 let mut ctx_value = base_ctx.clone();
943 if let Value::Object(ref mut map) = ctx_value {
944 map.insert("input".into(), payload.input.clone());
945 map.insert("result".into(), payload.input.clone());
946 }
947 render_template_value(
948 &payload.in_map,
949 &ctx_value,
950 TemplateOptions {
951 allow_pointer: true,
952 },
953 )
954 .context("failed to render provider.invoke in_map")?
955 } else if !payload.input.is_null() {
956 payload.input
957 } else {
958 Value::Null
959 };
960 let input_json = serde_json::to_vec(&input_value)?;
961
962 self.validate_tool(
963 ctx,
964 event,
965 payload.provider_id.as_deref(),
966 payload.provider_type.as_deref(),
967 &op,
968 &input_value,
969 )?;
970
971 let key = FlowKey {
972 pack_id: ctx.pack_id.to_string(),
973 flow_id: ctx.flow_id.to_string(),
974 };
975 let pack_idx = *self.flow_sources.get(&key).with_context(|| {
976 format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
977 })?;
978 let pack = Arc::clone(&self.packs[pack_idx]);
979 let binding = pack.resolve_provider(
980 payload.provider_id.as_deref(),
981 payload.provider_type.as_deref(),
982 )?;
983 let exec_ctx = component_exec_ctx(ctx, node_id);
984 #[cfg(feature = "fault-injection")]
985 {
986 let fault_ctx = FaultContext {
987 pack_id: ctx.pack_id,
988 flow_id: ctx.flow_id,
989 node_id: Some(node_id),
990 attempt: ctx.attempt,
991 };
992 maybe_fail(FaultPoint::BeforeToolCall, fault_ctx)
993 .map_err(|err| anyhow!(err.to_string()))?;
994 }
995 let result = pack
996 .invoke_provider(&binding, exec_ctx, &op, input_json)
997 .await?;
998 #[cfg(feature = "fault-injection")]
999 {
1000 let fault_ctx = FaultContext {
1001 pack_id: ctx.pack_id,
1002 flow_id: ctx.flow_id,
1003 node_id: Some(node_id),
1004 attempt: ctx.attempt,
1005 };
1006 maybe_fail(FaultPoint::AfterToolCall, fault_ctx)
1007 .map_err(|err| anyhow!(err.to_string()))?;
1008 }
1009
1010 let output = if payload.out_map.is_null() {
1011 result
1012 } else {
1013 let mut ctx_value = base_ctx;
1014 if let Value::Object(ref mut map) = ctx_value {
1015 map.insert("input".into(), result.clone());
1016 map.insert("result".into(), result.clone());
1017 }
1018 render_template_value(
1019 &payload.out_map,
1020 &ctx_value,
1021 TemplateOptions {
1022 allow_pointer: true,
1023 },
1024 )
1025 .context("failed to render provider.invoke out_map")?
1026 };
1027 let _ = payload.err_map;
1028 Ok(NodeOutput::new(output))
1029 }
1030
1031 fn validate_component(
1032 &self,
1033 ctx: &FlowContext<'_>,
1034 event: &NodeEvent<'_>,
1035 call: &ComponentCall,
1036 ) -> Result<()> {
1037 if self.validation.mode == ValidationMode::Off {
1038 return Ok(());
1039 }
1040 let mut metadata = JsonMap::new();
1041 metadata.insert("tenant_id".to_string(), json!(ctx.tenant));
1042 if let Some(id) = ctx.session_id {
1043 metadata.insert("session".to_string(), json!({ "id": id }));
1044 }
1045 let envelope = json!({
1046 "component_id": call.component_ref,
1047 "operation": call.operation,
1048 "input": call.input,
1049 "config": call.config,
1050 "metadata": Value::Object(metadata),
1051 });
1052 let issues = validate_component_envelope(&envelope);
1053 self.report_validation(ctx, event, "component", issues)
1054 }
1055
1056 fn validate_tool(
1057 &self,
1058 ctx: &FlowContext<'_>,
1059 event: &NodeEvent<'_>,
1060 provider_id: Option<&str>,
1061 provider_type: Option<&str>,
1062 operation: &str,
1063 input: &Value,
1064 ) -> Result<()> {
1065 if self.validation.mode == ValidationMode::Off {
1066 return Ok(());
1067 }
1068 let tool_id = provider_id.or(provider_type).unwrap_or("provider.invoke");
1069 let mut metadata = JsonMap::new();
1070 metadata.insert("tenant_id".to_string(), json!(ctx.tenant));
1071 if let Some(id) = ctx.session_id {
1072 metadata.insert("session".to_string(), json!({ "id": id }));
1073 }
1074 let envelope = json!({
1075 "tool_id": tool_id,
1076 "operation": operation,
1077 "input": input,
1078 "metadata": Value::Object(metadata),
1079 });
1080 let issues = validate_tool_envelope(&envelope);
1081 self.report_validation(ctx, event, "tool", issues)
1082 }
1083
1084 fn report_validation(
1085 &self,
1086 ctx: &FlowContext<'_>,
1087 event: &NodeEvent<'_>,
1088 kind: &str,
1089 issues: Vec<ValidationIssue>,
1090 ) -> Result<()> {
1091 if issues.is_empty() {
1092 return Ok(());
1093 }
1094 if let Some(observer) = ctx.observer {
1095 observer.on_validation(event, &issues);
1096 }
1097 match self.validation.mode {
1098 ValidationMode::Warn => {
1099 tracing::warn!(
1100 tenant = ctx.tenant,
1101 flow_id = ctx.flow_id,
1102 node_id = event.node_id,
1103 kind,
1104 issues = ?issues,
1105 "invocation envelope validation issues"
1106 );
1107 Ok(())
1108 }
1109 ValidationMode::Error => {
1110 tracing::error!(
1111 tenant = ctx.tenant,
1112 flow_id = ctx.flow_id,
1113 node_id = event.node_id,
1114 kind,
1115 issues = ?issues,
1116 "invocation envelope validation failed"
1117 );
1118 bail!("invocation_validation_failed");
1119 }
1120 ValidationMode::Off => Ok(()),
1121 }
1122 }
1123
1124 pub fn flows(&self) -> &[FlowDescriptor] {
1125 &self.flows
1126 }
1127
1128 pub fn flow_by_key(&self, pack_id: &str, flow_id: &str) -> Option<&FlowDescriptor> {
1129 self.flows
1130 .iter()
1131 .find(|descriptor| descriptor.pack_id == pack_id && descriptor.id == flow_id)
1132 }
1133
1134 pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
1135 let mut matches = self
1136 .flows
1137 .iter()
1138 .filter(|descriptor| descriptor.flow_type == flow_type);
1139 let first = matches.next()?;
1140 if matches.next().is_some() {
1141 return None;
1142 }
1143 Some(first)
1144 }
1145
1146 pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
1147 let mut matches = self
1148 .flows
1149 .iter()
1150 .filter(|descriptor| descriptor.id == flow_id);
1151 let first = matches.next()?;
1152 if matches.next().is_some() {
1153 return None;
1154 }
1155 Some(first)
1156 }
1157}
1158
1159pub trait ExecutionObserver: Send + Sync {
1160 fn on_node_start(&self, event: &NodeEvent<'_>);
1161 fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
1162 fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
1163 fn on_validation(&self, _event: &NodeEvent<'_>, _issues: &[ValidationIssue]) {}
1164}
1165
1166pub struct NodeEvent<'a> {
1167 pub context: &'a FlowContext<'a>,
1168 pub node_id: &'a str,
1169 pub node: &'a HostNode,
1170 pub payload: &'a Value,
1171}
1172
1173#[derive(Clone, Debug, Serialize, Deserialize)]
1174pub struct ExecutionState {
1175 #[serde(default)]
1176 entry: Value,
1177 #[serde(default)]
1178 input: Value,
1179 #[serde(default)]
1180 nodes: HashMap<String, NodeOutput>,
1181 #[serde(default)]
1182 egress: Vec<Value>,
1183 #[serde(default, skip_serializing_if = "Option::is_none")]
1184 last_output: Option<Value>,
1185 #[serde(default)]
1186 redirect_count: u32,
1187}
1188
1189impl ExecutionState {
1190 fn new(input: Value) -> Self {
1191 Self {
1192 entry: input.clone(),
1193 input,
1194 nodes: HashMap::new(),
1195 egress: Vec::new(),
1196 last_output: None,
1197 redirect_count: 0,
1198 }
1199 }
1200
1201 fn ensure_entry(&mut self) {
1202 if self.entry.is_null() {
1203 self.entry = self.input.clone();
1204 }
1205 }
1206
1207 fn context(&self) -> Value {
1208 let mut nodes = JsonMap::new();
1209 for (id, output) in &self.nodes {
1210 nodes.insert(
1211 id.clone(),
1212 json!({
1213 "ok": output.ok,
1214 "payload": output.payload.clone(),
1215 "meta": output.meta.clone(),
1216 }),
1217 );
1218 }
1219 json!({
1220 "entry": self.entry.clone(),
1221 "input": self.input.clone(),
1222 "nodes": nodes,
1223 "redirect_count": self.redirect_count,
1224 })
1225 }
1226
1227 fn outputs_map(&self) -> JsonMap<String, Value> {
1228 let mut outputs = JsonMap::new();
1229 for (id, output) in &self.nodes {
1230 outputs.insert(id.clone(), output.payload.clone());
1231 }
1232 outputs
1233 }
1234 fn push_egress(&mut self, payload: Value) {
1235 self.egress.push(payload);
1236 }
1237
1238 fn replace_input(&mut self, input: Value) {
1239 self.input = input;
1240 }
1241
1242 fn clear_egress(&mut self) {
1243 self.egress.clear();
1244 }
1245
1246 fn redirect_count(&self) -> u32 {
1247 self.redirect_count
1248 }
1249
1250 fn increment_redirect_count(&mut self) {
1251 self.redirect_count = self.redirect_count.saturating_add(1);
1252 }
1253
1254 fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
1255 if self.egress.is_empty() {
1256 return final_payload.unwrap_or(Value::Null);
1257 }
1258 let mut emitted = std::mem::take(&mut self.egress);
1259 if let Some(value) = final_payload {
1260 match value {
1261 Value::Null => {}
1262 Value::Array(items) => emitted.extend(items),
1263 other => emitted.push(other),
1264 }
1265 }
1266 Value::Array(emitted)
1267 }
1268}
1269
1270#[derive(Clone, Debug, Serialize, Deserialize)]
1271struct NodeOutput {
1272 ok: bool,
1273 payload: Value,
1274 meta: Value,
1275}
1276
1277impl NodeOutput {
1278 fn new(payload: Value) -> Self {
1279 Self {
1280 ok: true,
1281 payload,
1282 meta: Value::Null,
1283 }
1284 }
1285}
1286
1287struct DispatchOutcome {
1288 output: NodeOutput,
1289 control: NodeControl,
1290}
1291
1292impl DispatchOutcome {
1293 fn complete(output: NodeOutput) -> Self {
1294 Self {
1295 output,
1296 control: NodeControl::Continue,
1297 }
1298 }
1299
1300 fn wait(output: NodeOutput, reason: Option<String>) -> Self {
1301 Self {
1302 output,
1303 control: NodeControl::Wait { reason },
1304 }
1305 }
1306
1307 fn with_control(output: NodeOutput, control: NodeControl) -> Self {
1308 Self { output, control }
1309 }
1310}
1311
1312#[derive(Clone, Debug)]
1313enum NodeControl {
1314 Continue,
1315 Wait {
1316 reason: Option<String>,
1317 },
1318 Jump(JumpControl),
1319 Respond {
1320 text: Option<String>,
1321 card_cbor: Option<Vec<u8>>,
1322 needs_user: Option<bool>,
1323 },
1324}
1325
1326#[derive(Clone, Debug)]
1327struct JumpControl {
1328 flow: String,
1329 node: Option<String>,
1330 payload: Value,
1331 hints: Value,
1332 max_redirects: Option<u32>,
1333 reason: Option<String>,
1334}
1335
1336#[derive(Clone, Debug)]
1337struct JumpTarget {
1338 flow_id: String,
1339 flow: HostFlow,
1340 node_id: NodeId,
1341}
1342
1343impl NodeOutput {
1344 fn with_meta(payload: Value, meta: Value) -> Self {
1345 Self {
1346 ok: true,
1347 payload,
1348 meta,
1349 }
1350 }
1351}
1352
1353fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
1354 ComponentExecCtx {
1355 tenant: ComponentTenantCtx {
1356 tenant: ctx.tenant.to_string(),
1357 team: None,
1358 user: ctx.provider_id.map(str::to_string),
1359 trace_id: None,
1360 i18n_id: None,
1361 correlation_id: ctx.session_id.map(str::to_string),
1362 deadline_unix_ms: None,
1363 attempt: ctx.attempt,
1364 idempotency_key: ctx.session_id.map(str::to_string),
1365 },
1366 i18n_id: None,
1367 flow_id: ctx.flow_id.to_string(),
1368 node_id: Some(node_id.to_string()),
1369 }
1370}
1371
1372fn component_error(value: &Value) -> Option<(String, String)> {
1373 let obj = value.as_object()?;
1374 let ok = obj.get("ok").and_then(Value::as_bool)?;
1375 if ok {
1376 return None;
1377 }
1378 let err = obj.get("error")?.as_object()?;
1379 let code = err
1380 .get("code")
1381 .and_then(Value::as_str)
1382 .unwrap_or("component_error");
1383 let message = err
1384 .get("message")
1385 .and_then(Value::as_str)
1386 .unwrap_or("component reported error");
1387 Some((code.to_string(), message.to_string()))
1388}
1389
1390fn extract_wait_reason(payload: &Value) -> Option<String> {
1391 match payload {
1392 Value::String(s) => Some(s.clone()),
1393 Value::Object(map) => map
1394 .get("reason")
1395 .and_then(Value::as_str)
1396 .map(|value| value.to_string()),
1397 _ => None,
1398 }
1399}
1400
1401fn component_dispatch_outcome(output: NodeOutput) -> Result<DispatchOutcome> {
1402 if let Some(control) = parse_component_control(&output.payload)? {
1403 return Ok(match control {
1404 NodeControl::Jump(jump) => {
1405 let adjusted = NodeOutput::with_meta(jump.payload.clone(), jump.hints.clone());
1406 DispatchOutcome::with_control(adjusted, NodeControl::Jump(jump))
1407 }
1408 NodeControl::Respond {
1409 text,
1410 card_cbor,
1411 needs_user,
1412 } => DispatchOutcome::with_control(
1413 output,
1414 NodeControl::Respond {
1415 text,
1416 card_cbor,
1417 needs_user,
1418 },
1419 ),
1420 other => DispatchOutcome::with_control(output, other),
1421 });
1422 }
1423 Ok(DispatchOutcome::complete(output))
1424}
1425
1426fn parse_component_control(payload: &Value) -> Result<Option<NodeControl>> {
1427 let Value::Object(map) = payload else {
1428 return Ok(None);
1429 };
1430 let Some(control_value) = map.get("greentic_control") else {
1431 return Ok(None);
1432 };
1433 let control = control_value
1434 .as_object()
1435 .ok_or_else(|| anyhow!("jump_failed: greentic_control must be an object"))?;
1436 let action = control
1437 .get("action")
1438 .and_then(Value::as_str)
1439 .ok_or_else(|| anyhow!("jump_failed: greentic_control.action is required"))?;
1440 let version = control
1441 .get("v")
1442 .and_then(Value::as_u64)
1443 .ok_or_else(|| anyhow!("jump_failed: greentic_control.v is required"))?;
1444 if version != 1 {
1445 bail!("jump_failed: unsupported greentic_control.v={version}");
1446 }
1447
1448 match action {
1449 "jump" => {
1450 let flow = control
1451 .get("flow")
1452 .and_then(Value::as_str)
1453 .map(str::trim)
1454 .filter(|value| !value.is_empty())
1455 .ok_or_else(|| anyhow!("jump_failed: jump flow is required"))?
1456 .to_string();
1457 let node = control
1458 .get("node")
1459 .and_then(Value::as_str)
1460 .map(str::trim)
1461 .filter(|value| !value.is_empty())
1462 .map(str::to_string);
1463 let payload = control.get("payload").cloned().unwrap_or(Value::Null);
1464 let hints = control.get("hints").cloned().unwrap_or(Value::Null);
1465 let max_redirects = control
1466 .get("max_redirects")
1467 .and_then(Value::as_u64)
1468 .and_then(|value| u32::try_from(value).ok());
1469 let reason = control
1470 .get("reason")
1471 .and_then(Value::as_str)
1472 .map(str::to_string);
1473 Ok(Some(NodeControl::Jump(JumpControl {
1474 flow,
1475 node,
1476 payload,
1477 hints,
1478 max_redirects,
1479 reason,
1480 })))
1481 }
1482 "respond" => {
1483 let text = control
1484 .get("text")
1485 .and_then(Value::as_str)
1486 .map(str::to_string);
1487 let card_cbor = control
1488 .get("card_cbor")
1489 .and_then(Value::as_array)
1490 .map(|bytes| {
1491 bytes
1492 .iter()
1493 .filter_map(Value::as_u64)
1494 .filter_map(|value| u8::try_from(value).ok())
1495 .collect::<Vec<_>>()
1496 });
1497 let needs_user = control.get("needs_user").and_then(Value::as_bool);
1498 Ok(Some(NodeControl::Respond {
1499 text,
1500 card_cbor,
1501 needs_user,
1502 }))
1503 }
1504 _ => Ok(None),
1505 }
1506}
1507
1508fn template_context(state: &ExecutionState, prev: Value) -> Value {
1509 let entry = if state.entry.is_null() {
1510 Value::Object(JsonMap::new())
1511 } else {
1512 state.entry.clone()
1513 };
1514 let mut ctx = JsonMap::new();
1515 ctx.insert("entry".into(), entry.clone());
1516 ctx.insert("in".into(), entry); ctx.insert("prev".into(), prev);
1518 ctx.insert("node".into(), Value::Object(state.outputs_map()));
1519 ctx.insert("state".into(), state.context());
1520 Value::Object(ctx)
1521}
1522
1523impl From<Flow> for HostFlow {
1524 fn from(value: Flow) -> Self {
1525 let mut nodes = IndexMap::new();
1526 for (id, node) in value.nodes {
1527 nodes.insert(id.clone(), HostNode::from(node));
1528 }
1529 let start = value
1530 .entrypoints
1531 .get("default")
1532 .and_then(Value::as_str)
1533 .and_then(|id| NodeId::from_str(id).ok())
1534 .or_else(|| nodes.keys().next().cloned());
1535 Self {
1536 id: value.id.as_str().to_string(),
1537 start,
1538 nodes,
1539 }
1540 }
1541}
1542
1543impl From<Node> for HostNode {
1544 fn from(node: Node) -> Self {
1545 let component_ref = node.component.id.as_str().to_string();
1546 let raw_operation = node.component.operation.clone();
1547 let operation_in_mapping = extract_operation_from_mapping(&node.input.mapping);
1548 let operation_is_component_exec = raw_operation.as_deref() == Some("component.exec");
1549 let operation_is_emit = raw_operation
1550 .as_deref()
1551 .map(|op| op.starts_with("emit."))
1552 .unwrap_or(false);
1553 let is_component_exec = component_ref == "component.exec" || operation_is_component_exec;
1554
1555 let kind = if is_component_exec {
1556 let target = if component_ref == "component.exec" {
1557 if let Some(op) = raw_operation
1558 .as_deref()
1559 .filter(|op| op.starts_with("emit."))
1560 {
1561 op.to_string()
1562 } else {
1563 extract_target_component(&node.input.mapping)
1564 .unwrap_or_else(|| "component.exec".to_string())
1565 }
1566 } else {
1567 extract_target_component(&node.input.mapping)
1568 .unwrap_or_else(|| component_ref.clone())
1569 };
1570 if target.starts_with("emit.") {
1571 NodeKind::BuiltinEmit {
1572 kind: emit_kind_from_ref(&target),
1573 }
1574 } else {
1575 NodeKind::Exec {
1576 target_component: target,
1577 }
1578 }
1579 } else if operation_is_emit {
1580 NodeKind::BuiltinEmit {
1581 kind: emit_kind_from_ref(raw_operation.as_deref().unwrap_or("emit.log")),
1582 }
1583 } else {
1584 match component_ref.as_str() {
1585 "flow.call" => NodeKind::FlowCall,
1586 "provider.invoke" => NodeKind::ProviderInvoke,
1587 "session.wait" => NodeKind::Wait,
1588 comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
1589 kind: emit_kind_from_ref(comp),
1590 },
1591 other => NodeKind::PackComponent {
1592 component_ref: other.to_string(),
1593 },
1594 }
1595 };
1596 let component_label = match &kind {
1597 NodeKind::Exec { .. } => "component.exec".to_string(),
1598 NodeKind::PackComponent { component_ref } => component_ref.clone(),
1599 NodeKind::ProviderInvoke => "provider.invoke".to_string(),
1600 NodeKind::FlowCall => "flow.call".to_string(),
1601 NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
1602 NodeKind::Wait => "session.wait".to_string(),
1603 };
1604 let operation_name = if is_component_exec && operation_is_component_exec {
1605 None
1606 } else {
1607 raw_operation.clone()
1608 };
1609 let payload_expr = match kind {
1610 NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.input.mapping),
1611 _ => node.input.mapping.clone(),
1612 };
1613 Self {
1614 kind,
1615 component: component_label,
1616 component_id: if is_component_exec {
1617 "component.exec".to_string()
1618 } else {
1619 component_ref
1620 },
1621 operation_name,
1622 operation_in_mapping,
1623 payload_expr,
1624 routing: node.routing,
1625 }
1626 }
1627}
1628
1629fn extract_target_component(payload: &Value) -> Option<String> {
1630 match payload {
1631 Value::Object(map) => map
1632 .get("component")
1633 .or_else(|| map.get("component_ref"))
1634 .and_then(Value::as_str)
1635 .map(|s| s.to_string()),
1636 _ => None,
1637 }
1638}
1639
1640fn extract_operation_from_mapping(payload: &Value) -> Option<String> {
1641 match payload {
1642 Value::Object(map) => map
1643 .get("operation")
1644 .or_else(|| map.get("op"))
1645 .and_then(Value::as_str)
1646 .map(str::trim)
1647 .filter(|value| !value.is_empty())
1648 .map(|value| value.to_string()),
1649 _ => None,
1650 }
1651}
1652
1653fn extract_emit_payload(payload: &Value) -> Value {
1654 if let Value::Object(map) = payload {
1655 if let Some(input) = map.get("input") {
1656 return input.clone();
1657 }
1658 if let Some(inner) = map.get("payload") {
1659 return inner.clone();
1660 }
1661 }
1662 payload.clone()
1663}
1664
1665fn split_operation_payload(payload: Value) -> (Value, Value) {
1666 if let Value::Object(mut map) = payload.clone()
1667 && map.contains_key("input")
1668 {
1669 let input = map.remove("input").unwrap_or(Value::Null);
1670 let config = map.remove("config").unwrap_or(Value::Null);
1671 let legacy_only = map.keys().all(|key| {
1672 matches!(
1673 key.as_str(),
1674 "operation" | "op" | "component" | "component_ref"
1675 )
1676 });
1677 if legacy_only {
1678 return (input, config);
1679 }
1680 }
1681 (payload, Value::Null)
1682}
1683
1684fn resolve_component_operation(
1685 node_id: &str,
1686 component_label: &str,
1687 payload_operation: Option<String>,
1688 operation_override: Option<&str>,
1689 operation_in_mapping: Option<&str>,
1690) -> Result<String> {
1691 if let Some(op) = operation_override
1692 .map(str::trim)
1693 .filter(|value| !value.is_empty())
1694 {
1695 return Ok(op.to_string());
1696 }
1697
1698 if let Some(op) = payload_operation
1699 .as_deref()
1700 .map(str::trim)
1701 .filter(|value| !value.is_empty())
1702 {
1703 return Ok(op.to_string());
1704 }
1705
1706 let mut message = format!(
1707 "missing operation for node `{}` (component `{}`); expected node.component.operation to be set",
1708 node_id, component_label,
1709 );
1710 if let Some(found) = operation_in_mapping {
1711 message.push_str(&format!(
1712 ". Found operation in input.mapping (`{}`) but this is not used; pack compiler must preserve node.component.operation.",
1713 found
1714 ));
1715 }
1716 bail!(message);
1717}
1718
1719fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
1720 match component_ref {
1721 "emit.log" => EmitKind::Log,
1722 "emit.response" => EmitKind::Response,
1723 other => EmitKind::Other(other.to_string()),
1724 }
1725}
1726
1727fn emit_ref_from_kind(kind: &EmitKind) -> String {
1728 match kind {
1729 EmitKind::Log => "emit.log".to_string(),
1730 EmitKind::Response => "emit.response".to_string(),
1731 EmitKind::Other(other) => other.clone(),
1732 }
1733}
1734
1735fn is_card_invocation(input: &Value) -> bool {
1738 if let Value::Object(map) = input {
1739 return map.contains_key("card_source") || map.contains_key("card_spec");
1740 }
1741 false
1742}
1743
1744fn resolve_card_assets(input: &mut Value, pack: &crate::pack::PackRuntime) {
1751 resolve_card_spec_asset(input, pack);
1752
1753 if let Value::Object(map) = input
1756 && let Some(Value::Object(call)) = map.get_mut("call")
1757 && let Some(payload) = call.get_mut("payload")
1758 {
1759 resolve_card_spec_asset(payload, pack);
1760 }
1761}
1762
1763fn resolve_card_spec_asset(value: &mut Value, pack: &crate::pack::PackRuntime) {
1765 let Value::Object(map) = value else { return };
1766
1767 let is_asset = map
1768 .get("card_source")
1769 .and_then(Value::as_str)
1770 .map(|s| s.eq_ignore_ascii_case("asset"))
1771 .unwrap_or(false);
1772 if !is_asset {
1773 return;
1774 }
1775
1776 let asset_path = map
1777 .get("card_spec")
1778 .and_then(|spec| spec.get("asset_path"))
1779 .and_then(Value::as_str)
1780 .map(str::to_string);
1781
1782 let Some(asset_path) = asset_path else { return };
1783
1784 match pack.read_asset(&asset_path) {
1785 Ok(bytes) => {
1786 let card_json: Value = match serde_json::from_slice(&bytes) {
1787 Ok(v) => v,
1788 Err(err) => {
1789 tracing::warn!(
1790 asset_path,
1791 %err,
1792 "failed to parse card asset as JSON; leaving as asset reference"
1793 );
1794 return;
1795 }
1796 };
1797 tracing::debug!(asset_path, "pre-resolved card asset to inline_json");
1798 map.insert("card_source".into(), Value::String("inline".into()));
1799 if let Some(Value::Object(spec)) = map.get_mut("card_spec") {
1800 spec.insert("inline_json".into(), card_json);
1801 spec.remove("asset_path");
1802 }
1803 }
1804 Err(err) => {
1805 tracing::warn!(
1806 asset_path,
1807 %err,
1808 "card asset not found in pack; leaving as asset reference"
1809 );
1810 }
1811 }
1812}
1813
1814#[cfg(test)]
1815mod tests {
1816 use super::*;
1817 use crate::validate::{ValidationConfig, ValidationMode};
1818 use greentic_types::{
1819 Flow, FlowComponentRef, FlowId, FlowKind, InputMapping, Node, NodeId, OutputMapping,
1820 Routing, TelemetryHints,
1821 };
1822 use serde_json::json;
1823 use std::collections::BTreeMap;
1824 use std::str::FromStr;
1825 use std::sync::Mutex;
1826 use tokio::runtime::Runtime;
1827
1828 fn minimal_engine() -> FlowEngine {
1829 FlowEngine {
1830 packs: Vec::new(),
1831 flows: Vec::new(),
1832 flow_sources: HashMap::new(),
1833 flow_cache: RwLock::new(HashMap::new()),
1834 default_env: "local".to_string(),
1835 validation: ValidationConfig {
1836 mode: ValidationMode::Off,
1837 },
1838 }
1839 }
1840
1841 #[test]
1842 fn templating_renders_with_partials_and_data() {
1843 let mut state = ExecutionState::new(json!({ "city": "London" }));
1844 state.nodes.insert(
1845 "forecast".to_string(),
1846 NodeOutput::new(json!({ "temp": "20C" })),
1847 );
1848
1849 let ctx = state.context();
1851 assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
1852 }
1853
1854 #[test]
1855 fn finalize_wraps_emitted_payloads() {
1856 let mut state = ExecutionState::new(json!({}));
1857 state.push_egress(json!({ "text": "first" }));
1858 state.push_egress(json!({ "text": "second" }));
1859 let result = state.finalize_with(Some(json!({ "text": "final" })));
1860 assert_eq!(
1861 result,
1862 json!([
1863 { "text": "first" },
1864 { "text": "second" },
1865 { "text": "final" }
1866 ])
1867 );
1868 }
1869
1870 #[test]
1871 fn finalize_flattens_final_array() {
1872 let mut state = ExecutionState::new(json!({}));
1873 state.push_egress(json!({ "text": "only" }));
1874 let result = state.finalize_with(Some(json!([
1875 { "text": "extra-1" },
1876 { "text": "extra-2" }
1877 ])));
1878 assert_eq!(
1879 result,
1880 json!([
1881 { "text": "only" },
1882 { "text": "extra-1" },
1883 { "text": "extra-2" }
1884 ])
1885 );
1886 }
1887
1888 #[test]
1889 fn parse_component_control_ignores_plain_payload() {
1890 let payload = json!({
1891 "flow": "not-a-control-field",
1892 "node": "n1"
1893 });
1894 let control = parse_component_control(&payload).expect("parse control");
1895 assert!(control.is_none());
1896 }
1897
1898 #[test]
1899 fn parse_component_control_parses_jump_marker() {
1900 let payload = json!({
1901 "greentic_control": {
1902 "action": "jump",
1903 "v": 1,
1904 "flow": "flow.b",
1905 "node": "node-2",
1906 "payload": { "message": "hi" },
1907 "hints": { "k": "v" },
1908 "max_redirects": 2,
1909 "reason": "handoff"
1910 }
1911 });
1912 let control = parse_component_control(&payload)
1913 .expect("parse control")
1914 .expect("missing control");
1915 match control {
1916 NodeControl::Jump(jump) => {
1917 assert_eq!(jump.flow, "flow.b");
1918 assert_eq!(jump.node.as_deref(), Some("node-2"));
1919 assert_eq!(jump.payload, json!({ "message": "hi" }));
1920 assert_eq!(jump.hints, json!({ "k": "v" }));
1921 assert_eq!(jump.max_redirects, Some(2));
1922 assert_eq!(jump.reason.as_deref(), Some("handoff"));
1923 }
1924 other => panic!("expected jump control, got {other:?}"),
1925 }
1926 }
1927
1928 #[test]
1929 fn parse_component_control_rejects_invalid_marker() {
1930 let payload = json!({
1931 "greentic_control": "bad-shape"
1932 });
1933 let err = parse_component_control(&payload).expect_err("expected invalid marker error");
1934 assert!(err.to_string().contains("greentic_control"));
1935 }
1936
1937 #[test]
1938 fn missing_operation_reports_node_and_component() {
1939 let engine = minimal_engine();
1940 let rt = Runtime::new().unwrap();
1941 let retry_config = RetryConfig {
1942 max_attempts: 1,
1943 base_delay_ms: 1,
1944 };
1945 let ctx = FlowContext {
1946 tenant: "tenant",
1947 pack_id: "test-pack",
1948 flow_id: "flow",
1949 node_id: Some("missing-op"),
1950 tool: None,
1951 action: None,
1952 session_id: None,
1953 provider_id: None,
1954 retry_config,
1955 attempt: 1,
1956 observer: None,
1957 mocks: None,
1958 };
1959 let node = HostNode {
1960 kind: NodeKind::Exec {
1961 target_component: "qa.process".into(),
1962 },
1963 component: "component.exec".into(),
1964 component_id: "component.exec".into(),
1965 operation_name: None,
1966 operation_in_mapping: None,
1967 payload_expr: Value::Null,
1968 routing: Routing::End,
1969 };
1970 let _state = ExecutionState::new(Value::Null);
1971 let payload = json!({ "component": "qa.process" });
1972 let event = NodeEvent {
1973 context: &ctx,
1974 node_id: "missing-op",
1975 node: &node,
1976 payload: &payload,
1977 };
1978 let err = rt
1979 .block_on(engine.execute_component_exec(
1980 &ctx,
1981 "missing-op",
1982 &node,
1983 payload.clone(),
1984 &event,
1985 ComponentOverrides {
1986 component: None,
1987 operation: None,
1988 },
1989 ))
1990 .unwrap_err();
1991 let message = err.to_string();
1992 assert!(
1993 message.contains("missing operation for node `missing-op`"),
1994 "unexpected message: {message}"
1995 );
1996 assert!(
1997 message.contains("(component `component.exec`)"),
1998 "unexpected message: {message}"
1999 );
2000 }
2001
2002 #[test]
2003 fn missing_operation_mentions_mapping_hint() {
2004 let engine = minimal_engine();
2005 let rt = Runtime::new().unwrap();
2006 let retry_config = RetryConfig {
2007 max_attempts: 1,
2008 base_delay_ms: 1,
2009 };
2010 let ctx = FlowContext {
2011 tenant: "tenant",
2012 pack_id: "test-pack",
2013 flow_id: "flow",
2014 node_id: Some("missing-op-hint"),
2015 tool: None,
2016 action: None,
2017 session_id: None,
2018 provider_id: None,
2019 retry_config,
2020 attempt: 1,
2021 observer: None,
2022 mocks: None,
2023 };
2024 let node = HostNode {
2025 kind: NodeKind::Exec {
2026 target_component: "qa.process".into(),
2027 },
2028 component: "component.exec".into(),
2029 component_id: "component.exec".into(),
2030 operation_name: None,
2031 operation_in_mapping: Some("render".into()),
2032 payload_expr: Value::Null,
2033 routing: Routing::End,
2034 };
2035 let _state = ExecutionState::new(Value::Null);
2036 let payload = json!({ "component": "qa.process" });
2037 let event = NodeEvent {
2038 context: &ctx,
2039 node_id: "missing-op-hint",
2040 node: &node,
2041 payload: &payload,
2042 };
2043 let err = rt
2044 .block_on(engine.execute_component_exec(
2045 &ctx,
2046 "missing-op-hint",
2047 &node,
2048 payload.clone(),
2049 &event,
2050 ComponentOverrides {
2051 component: None,
2052 operation: None,
2053 },
2054 ))
2055 .unwrap_err();
2056 let message = err.to_string();
2057 assert!(
2058 message.contains("missing operation for node `missing-op-hint`"),
2059 "unexpected message: {message}"
2060 );
2061 assert!(
2062 message.contains("Found operation in input.mapping (`render`)"),
2063 "unexpected message: {message}"
2064 );
2065 }
2066
2067 struct CountingObserver {
2068 starts: Mutex<Vec<String>>,
2069 ends: Mutex<Vec<Value>>,
2070 }
2071
2072 impl CountingObserver {
2073 fn new() -> Self {
2074 Self {
2075 starts: Mutex::new(Vec::new()),
2076 ends: Mutex::new(Vec::new()),
2077 }
2078 }
2079 }
2080
2081 impl ExecutionObserver for CountingObserver {
2082 fn on_node_start(&self, event: &NodeEvent<'_>) {
2083 self.starts.lock().unwrap().push(event.node_id.to_string());
2084 }
2085
2086 fn on_node_end(&self, _event: &NodeEvent<'_>, output: &Value) {
2087 self.ends.lock().unwrap().push(output.clone());
2088 }
2089
2090 fn on_node_error(&self, _event: &NodeEvent<'_>, _error: &dyn StdError) {}
2091 }
2092
2093 #[test]
2094 fn emits_end_event_for_successful_node() {
2095 let node_id = NodeId::from_str("emit").unwrap();
2096 let node = Node {
2097 id: node_id.clone(),
2098 component: FlowComponentRef {
2099 id: "emit.log".parse().unwrap(),
2100 pack_alias: None,
2101 operation: None,
2102 },
2103 input: InputMapping {
2104 mapping: json!({ "message": "logged" }),
2105 },
2106 output: OutputMapping {
2107 mapping: Value::Null,
2108 },
2109 routing: Routing::End,
2110 telemetry: TelemetryHints::default(),
2111 };
2112 let mut nodes = indexmap::IndexMap::default();
2113 nodes.insert(node_id.clone(), node);
2114 let flow = Flow {
2115 schema_version: "1.0".into(),
2116 id: FlowId::from_str("emit.flow").unwrap(),
2117 kind: FlowKind::Messaging,
2118 entrypoints: BTreeMap::from([(
2119 "default".to_string(),
2120 Value::String(node_id.to_string()),
2121 )]),
2122 nodes,
2123 metadata: Default::default(),
2124 };
2125 let host_flow = HostFlow::from(flow);
2126
2127 let engine = FlowEngine {
2128 packs: Vec::new(),
2129 flows: Vec::new(),
2130 flow_sources: HashMap::new(),
2131 flow_cache: RwLock::new(HashMap::from([(
2132 FlowKey {
2133 pack_id: "test-pack".to_string(),
2134 flow_id: "emit.flow".to_string(),
2135 },
2136 host_flow,
2137 )])),
2138 default_env: "local".to_string(),
2139 validation: ValidationConfig {
2140 mode: ValidationMode::Off,
2141 },
2142 };
2143 let observer = CountingObserver::new();
2144 let ctx = FlowContext {
2145 tenant: "demo",
2146 pack_id: "test-pack",
2147 flow_id: "emit.flow",
2148 node_id: None,
2149 tool: None,
2150 action: None,
2151 session_id: None,
2152 provider_id: None,
2153 retry_config: RetryConfig {
2154 max_attempts: 1,
2155 base_delay_ms: 1,
2156 },
2157 attempt: 1,
2158 observer: Some(&observer),
2159 mocks: None,
2160 };
2161
2162 let rt = Runtime::new().unwrap();
2163 let result = rt.block_on(engine.execute(ctx, Value::Null)).unwrap();
2164 assert!(matches!(result.status, FlowStatus::Completed));
2165
2166 let starts = observer.starts.lock().unwrap();
2167 let ends = observer.ends.lock().unwrap();
2168 assert_eq!(starts.len(), 1);
2169 assert_eq!(ends.len(), 1);
2170 assert_eq!(ends[0], json!({ "message": "logged" }));
2171 }
2172
2173 fn host_flow_for_test(
2174 flow_id: &str,
2175 node_ids: &[&str],
2176 default_start: Option<&str>,
2177 ) -> HostFlow {
2178 let mut nodes = indexmap::IndexMap::default();
2179 for node_id in node_ids {
2180 let id = NodeId::from_str(node_id).unwrap();
2181 let node = Node {
2182 id: id.clone(),
2183 component: FlowComponentRef {
2184 id: "emit.log".parse().unwrap(),
2185 pack_alias: None,
2186 operation: None,
2187 },
2188 input: InputMapping {
2189 mapping: json!({ "message": node_id }),
2190 },
2191 output: OutputMapping {
2192 mapping: Value::Null,
2193 },
2194 routing: Routing::End,
2195 telemetry: TelemetryHints::default(),
2196 };
2197 nodes.insert(id, node);
2198 }
2199 let mut entrypoints = BTreeMap::new();
2200 if let Some(start) = default_start {
2201 entrypoints.insert("default".to_string(), Value::String(start.to_string()));
2202 }
2203 HostFlow::from(Flow {
2204 schema_version: "1.0".into(),
2205 id: FlowId::from_str(flow_id).unwrap(),
2206 kind: FlowKind::Messaging,
2207 entrypoints,
2208 nodes,
2209 metadata: Default::default(),
2210 })
2211 }
2212
2213 fn jump_test_engine() -> FlowEngine {
2214 let target_flow = host_flow_for_test("flow.target", &["node-a", "node-b"], None);
2215 FlowEngine {
2216 packs: Vec::new(),
2217 flows: Vec::new(),
2218 flow_sources: HashMap::new(),
2219 flow_cache: RwLock::new(HashMap::from([(
2220 FlowKey {
2221 pack_id: "test-pack".to_string(),
2222 flow_id: "flow.target".to_string(),
2223 },
2224 target_flow,
2225 )])),
2226 default_env: "local".to_string(),
2227 validation: ValidationConfig {
2228 mode: ValidationMode::Off,
2229 },
2230 }
2231 }
2232
2233 fn jump_ctx<'a>(flow_id: &'a str) -> FlowContext<'a> {
2234 FlowContext {
2235 tenant: "demo",
2236 pack_id: "test-pack",
2237 flow_id,
2238 node_id: None,
2239 tool: None,
2240 action: None,
2241 session_id: None,
2242 provider_id: None,
2243 retry_config: RetryConfig {
2244 max_attempts: 1,
2245 base_delay_ms: 1,
2246 },
2247 attempt: 1,
2248 observer: None,
2249 mocks: None,
2250 }
2251 }
2252
2253 #[test]
2254 fn apply_jump_unknown_flow_errors() {
2255 let engine = minimal_engine();
2256 let mut state = ExecutionState::new(Value::Null);
2257 let rt = Runtime::new().unwrap();
2258 let err = rt
2259 .block_on(engine.apply_jump(
2260 &jump_ctx("flow.source"),
2261 &mut state,
2262 JumpControl {
2263 flow: "flow.missing".into(),
2264 node: None,
2265 payload: json!({ "ok": true }),
2266 hints: Value::Null,
2267 max_redirects: None,
2268 reason: None,
2269 },
2270 ))
2271 .unwrap_err();
2272 assert!(
2273 err.to_string().contains("unknown_flow"),
2274 "unexpected error: {err}"
2275 );
2276 }
2277
2278 #[test]
2279 fn apply_jump_unknown_node_errors() {
2280 let engine = jump_test_engine();
2281 let mut state = ExecutionState::new(Value::Null);
2282 let rt = Runtime::new().unwrap();
2283 let err = rt
2284 .block_on(engine.apply_jump(
2285 &jump_ctx("flow.source"),
2286 &mut state,
2287 JumpControl {
2288 flow: "flow.target".into(),
2289 node: Some("node-missing".into()),
2290 payload: json!({ "ok": true }),
2291 hints: Value::Null,
2292 max_redirects: None,
2293 reason: None,
2294 },
2295 ))
2296 .unwrap_err();
2297 assert!(
2298 err.to_string().contains("unknown_node"),
2299 "unexpected error: {err}"
2300 );
2301 }
2302
2303 #[test]
2304 fn apply_jump_uses_default_start_fallback() {
2305 let engine = jump_test_engine();
2306 let mut state = ExecutionState::new(Value::Null);
2307 let rt = Runtime::new().unwrap();
2308 let target = rt
2309 .block_on(engine.apply_jump(
2310 &jump_ctx("flow.source"),
2311 &mut state,
2312 JumpControl {
2313 flow: "flow.target".into(),
2314 node: None,
2315 payload: json!({ "k": "v" }),
2316 hints: Value::Null,
2317 max_redirects: None,
2318 reason: None,
2319 },
2320 ))
2321 .expect("jump target");
2322 assert_eq!(target.flow_id, "flow.target");
2323 assert_eq!(target.node_id.as_str(), "node-a");
2324 }
2325
2326 #[test]
2327 fn apply_jump_redirect_limit_enforced() {
2328 let engine = jump_test_engine();
2329 let mut state = ExecutionState::new(Value::Null);
2330 state.redirect_count = 3;
2331 let rt = Runtime::new().unwrap();
2332 let err = rt
2333 .block_on(engine.apply_jump(
2334 &jump_ctx("flow.source"),
2335 &mut state,
2336 JumpControl {
2337 flow: "flow.target".into(),
2338 node: None,
2339 payload: json!({ "k": "v" }),
2340 hints: Value::Null,
2341 max_redirects: Some(3),
2342 reason: None,
2343 },
2344 ))
2345 .unwrap_err();
2346 assert_eq!(err.to_string(), "redirect_limit");
2347 }
2348}
2349
2350use tracing::Instrument;
2351
2352pub struct FlowContext<'a> {
2353 pub tenant: &'a str,
2354 pub pack_id: &'a str,
2355 pub flow_id: &'a str,
2356 pub node_id: Option<&'a str>,
2357 pub tool: Option<&'a str>,
2358 pub action: Option<&'a str>,
2359 pub session_id: Option<&'a str>,
2360 pub provider_id: Option<&'a str>,
2361 pub retry_config: RetryConfig,
2362 pub attempt: u32,
2363 pub observer: Option<&'a dyn ExecutionObserver>,
2364 pub mocks: Option<&'a MockLayer>,
2365}
2366
2367#[derive(Copy, Clone)]
2368pub struct RetryConfig {
2369 pub max_attempts: u32,
2370 pub base_delay_ms: u64,
2371}
2372
2373fn should_retry(err: &anyhow::Error) -> bool {
2374 let lower = err.to_string().to_lowercase();
2375 lower.contains("transient")
2376 || lower.contains("unavailable")
2377 || lower.contains("internal")
2378 || lower.contains("timeout")
2379}
2380
2381impl From<FlowRetryConfig> for RetryConfig {
2382 fn from(value: FlowRetryConfig) -> Self {
2383 Self {
2384 max_attempts: value.max_attempts.max(1),
2385 base_delay_ms: value.base_delay_ms.max(50),
2386 }
2387 }
2388}