1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
9use anyhow::{Context, Result, anyhow, bail};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use super::templating::{TemplateOptions, render_template_value};
18use crate::config::{FlowRetryConfig, HostConfig};
19use crate::pack::{FlowDescriptor, PackRuntime};
20use crate::runner::invocation::{InvocationMeta, build_invocation_envelope};
21use crate::telemetry::{
22 FlowSpanAttributes, RolloutIds, annotate_span, backoff_delay_ms, set_flow_context,
23};
24#[cfg(feature = "fault-injection")]
25use crate::testing::fault_injection::{FaultContext, FaultPoint, maybe_fail};
26use crate::validate::{
27 ValidationConfig, ValidationIssue, ValidationMode, validate_component_envelope,
28 validate_tool_envelope,
29};
30use greentic_flow::SLOT_SCHEMA_METADATA_KEY;
31use greentic_types::{Flow, Node, NodeId, Routing};
32
33const SLOT_EXTRACTOR_COMPONENT_ID: &str = "ai.greentic.component-slot-extractor";
37
38pub trait CrossPackResolver: Send + Sync {
45 fn invoke(
46 &self,
47 provider_id: &str,
48 provider_type: Option<&str>,
49 op: &str,
50 input: &[u8],
51 tenant: &str,
52 team: Option<&str>,
53 ) -> Result<Value>;
54}
55
56pub struct FlowEngine {
57 packs: Vec<Arc<PackRuntime>>,
58 flows: Vec<FlowDescriptor>,
59 flow_sources: HashMap<FlowKey, usize>,
60 flow_cache: RwLock<HashMap<FlowKey, HostFlow>>,
61 default_env: String,
62 validation: ValidationConfig,
63 cross_pack_resolver: Option<Arc<dyn CrossPackResolver>>,
64 rollout_ids: RolloutIds,
69}
70
71#[derive(Clone, Debug, PartialEq, Eq, Hash)]
72struct FlowKey {
73 pack_id: String,
74 flow_id: String,
75}
76
77#[derive(Clone, Debug, Serialize, Deserialize)]
78pub struct FlowSnapshot {
79 pub pack_id: String,
80 pub flow_id: String,
81 #[serde(default, skip_serializing_if = "Option::is_none")]
82 pub next_flow: Option<String>,
83 pub next_node: String,
84 pub state: ExecutionState,
85}
86
87#[derive(Clone, Debug)]
88pub struct FlowWait {
89 pub reason: Option<String>,
90 pub snapshot: FlowSnapshot,
91}
92
93#[derive(Clone, Debug)]
94pub enum FlowStatus {
95 Completed,
96 Waiting(Box<FlowWait>),
97}
98
99#[derive(Clone, Debug)]
100pub struct FlowExecution {
101 pub output: Value,
102 pub status: FlowStatus,
103}
104
105#[derive(Clone, Debug)]
106struct HostFlow {
107 id: String,
108 start: Option<NodeId>,
109 nodes: IndexMap<NodeId, HostNode>,
110 slot_schema: Option<Value>,
113}
114
115#[derive(Clone, Debug)]
116pub struct HostNode {
117 kind: NodeKind,
118 pub component: String,
120 component_id: String,
121 operation_name: Option<String>,
122 operation_in_mapping: Option<String>,
123 payload_expr: Value,
124 routing: Routing,
125}
126
127impl HostNode {
128 pub fn component_id(&self) -> &str {
129 &self.component_id
130 }
131
132 pub fn operation_name(&self) -> Option<&str> {
133 self.operation_name.as_deref()
134 }
135
136 pub fn operation_in_mapping(&self) -> Option<&str> {
137 self.operation_in_mapping.as_deref()
138 }
139}
140
141#[derive(Clone, Debug)]
142enum NodeKind {
143 Exec { target_component: String },
144 PackComponent { component_ref: String },
145 ProviderInvoke,
146 FlowCall,
147 BuiltinEmit { kind: EmitKind },
148 BuiltinStateGet,
149 BuiltinStateSet,
150 Wait,
151}
152
153#[derive(Clone, Debug)]
154enum EmitKind {
155 Log,
156 Response,
157 Other(String),
158}
159
160struct ComponentOverrides<'a> {
161 component: Option<&'a str>,
162 operation: Option<&'a str>,
163}
164
165struct ComponentCall {
166 component_ref: String,
167 operation: String,
168 input: Value,
169 config: Value,
170}
171
172impl FlowExecution {
173 fn completed(output: Value) -> Self {
174 Self {
175 output,
176 status: FlowStatus::Completed,
177 }
178 }
179
180 fn waiting(output: Value, wait: FlowWait) -> Self {
181 Self {
182 output,
183 status: FlowStatus::Waiting(Box::new(wait)),
184 }
185 }
186}
187
188impl FlowEngine {
189 pub async fn new(packs: Vec<Arc<PackRuntime>>, config: Arc<HostConfig>) -> Result<Self> {
190 let mut flow_sources: HashMap<FlowKey, usize> = HashMap::new();
191 let mut descriptors = Vec::new();
192 let mut bindings = HashMap::new();
193 for pack in &config.pack_bindings {
194 bindings.insert(pack.pack_id.clone(), pack.flows.clone());
195 }
196 let enforce_bindings = !bindings.is_empty();
197 for (idx, pack) in packs.iter().enumerate() {
198 let pack_id = pack.metadata().pack_id.clone();
199 if enforce_bindings && !bindings.contains_key(&pack_id) {
200 bail!("no gtbind entries found for pack {}", pack_id);
201 }
202 let flows = pack.list_flows().await?;
203 let allowed = bindings.get(&pack_id).map(|flows| {
204 flows
205 .iter()
206 .cloned()
207 .collect::<std::collections::HashSet<_>>()
208 });
209 let mut seen = std::collections::HashSet::new();
210 for flow in flows {
211 if let Some(ref allow) = allowed
212 && !allow.contains(&flow.id)
213 {
214 continue;
215 }
216 seen.insert(flow.id.clone());
217 tracing::info!(
218 flow_id = %flow.id,
219 flow_type = %flow.flow_type,
220 pack_id = %flow.pack_id,
221 pack_index = idx,
222 "registered flow"
223 );
224 if let Ok(flow_ir) = pack.load_flow(&flow.id) {
225 for node in flow_ir.nodes.values() {
226 config
227 .secrets_policy
228 .register_flow_secret_refs(&node.input.mapping);
229 config
230 .secrets_policy
231 .register_flow_secret_refs(&node.output.mapping);
232 }
233 }
234 flow_sources.insert(
235 FlowKey {
236 pack_id: flow.pack_id.clone(),
237 flow_id: flow.id.clone(),
238 },
239 idx,
240 );
241 descriptors.retain(|existing: &FlowDescriptor| {
242 !(existing.id == flow.id && existing.pack_id == flow.pack_id)
243 });
244 descriptors.push(flow);
245 }
246 if let Some(allow) = allowed {
247 let missing = allow.difference(&seen).cloned().collect::<Vec<_>>();
248 if !missing.is_empty() {
249 bail!(
250 "gtbind flow ids missing in pack {}: {}",
251 pack_id,
252 missing.join(", ")
253 );
254 }
255 }
256 }
257
258 let mut flow_map = HashMap::new();
259 for flow in &descriptors {
260 let pack_id = flow.pack_id.clone();
261 if let Some(&pack_idx) = flow_sources.get(&FlowKey {
262 pack_id: pack_id.clone(),
263 flow_id: flow.id.clone(),
264 }) {
265 let pack_clone = Arc::clone(&packs[pack_idx]);
266 let flow_id = flow.id.clone();
267 let task_flow_id = flow_id.clone();
268 match task::spawn_blocking(move || pack_clone.load_flow(&task_flow_id)).await {
269 Ok(Ok(loaded_flow)) => {
270 flow_map.insert(
271 FlowKey {
272 pack_id: pack_id.clone(),
273 flow_id,
274 },
275 HostFlow::from(loaded_flow),
276 );
277 }
278 Ok(Err(err)) => {
279 tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
280 }
281 Err(err) => {
282 tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
283 }
284 }
285 }
286 }
287
288 Ok(Self {
289 packs,
290 flows: descriptors,
291 flow_sources,
292 flow_cache: RwLock::new(flow_map),
293 default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
294 validation: config.validation.clone(),
295 cross_pack_resolver: None,
296 rollout_ids: RolloutIds::default(),
297 })
298 }
299
300 pub fn with_rollout_ids(mut self, rollout_ids: RolloutIds) -> Self {
306 self.rollout_ids = rollout_ids;
307 self
308 }
309
310 pub fn rollout_ids(&self) -> &RolloutIds {
314 &self.rollout_ids
315 }
316
317 pub fn set_cross_pack_resolver(&mut self, resolver: Arc<dyn CrossPackResolver>) {
320 self.cross_pack_resolver = Some(resolver);
321 }
322
323 async fn get_or_load_flow(&self, pack_id: &str, flow_id: &str) -> Result<HostFlow> {
324 let key = FlowKey {
325 pack_id: pack_id.to_string(),
326 flow_id: flow_id.to_string(),
327 };
328 if let Some(flow) = self.flow_cache.read().get(&key).cloned() {
329 return Ok(flow);
330 }
331
332 let pack_idx = *self
333 .flow_sources
334 .get(&key)
335 .with_context(|| format!("flow {pack_id}:{flow_id} not registered"))?;
336 let pack = Arc::clone(&self.packs[pack_idx]);
337 let flow_id_owned = flow_id.to_string();
338 let task_flow_id = flow_id_owned.clone();
339 let flow = task::spawn_blocking(move || pack.load_flow(&task_flow_id))
340 .await
341 .context("failed to join flow metadata task")??;
342 let host_flow = HostFlow::from(flow);
343 self.flow_cache.write().insert(
344 FlowKey {
345 pack_id: pack_id.to_string(),
346 flow_id: flow_id_owned.clone(),
347 },
348 host_flow.clone(),
349 );
350 Ok(host_flow)
351 }
352
353 fn flow_execute_span(&self, ctx: &FlowContext<'_>) -> tracing::Span {
360 let span = tracing::info_span!(
361 "flow.execute",
362 tenant = tracing::field::Empty,
363 flow_id = tracing::field::Empty,
364 node_id = tracing::field::Empty,
365 tool = tracing::field::Empty,
366 action = tracing::field::Empty
367 );
368 annotate_span(
369 &span,
370 &FlowSpanAttributes {
371 tenant: ctx.tenant,
372 flow_id: ctx.flow_id,
373 node_id: ctx.node_id,
374 tool: ctx.tool,
375 action: ctx.action,
376 },
377 );
378 set_flow_context(
379 &span,
380 &self.default_env,
381 ctx.tenant,
382 ctx.flow_id,
383 ctx.node_id,
384 ctx.provider_id,
385 ctx.session_id,
386 ctx.pack_id,
387 &self.rollout_ids,
388 );
389 span
390 }
391
392 pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
393 let span = self.flow_execute_span(&ctx);
394 let retry_config = ctx.retry_config;
395 let original_input = input;
396 let mut ctx = ctx;
397 let metric_tenant = ctx.tenant.to_string();
398 let metric_flow_id = ctx.flow_id.to_string();
399 let started = std::time::Instant::now();
400 let result = async move {
401 let mut attempt = 0u32;
402 loop {
403 attempt += 1;
404 ctx.attempt = attempt;
405 #[cfg(feature = "fault-injection")]
406 {
407 let fault_ctx = FaultContext {
408 pack_id: ctx.pack_id,
409 flow_id: ctx.flow_id,
410 node_id: ctx.node_id,
411 attempt: ctx.attempt,
412 };
413 maybe_fail(FaultPoint::Timeout, fault_ctx)
414 .map_err(|err| anyhow!(err.to_string()))?;
415 }
416 match self.execute_once(&ctx, original_input.clone()).await {
417 Ok(value) => return Ok(value),
418 Err(err) => {
419 if attempt >= retry_config.max_attempts || !should_retry(&err) {
420 if ctx.session_id.is_some() {
425 return Ok(FlowExecution::completed(json!({
426 "metadata": {
427 "error_kind": "flow_execution_failed",
428 "error_message": err.to_string(),
429 "flow_id": ctx.flow_id,
430 }
431 })));
432 }
433 return Err(err);
434 }
435 let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
436 tracing::warn!(
437 tenant = ctx.tenant,
438 flow_id = ctx.flow_id,
439 attempt,
440 max_attempts = retry_config.max_attempts,
441 delay_ms = delay,
442 error = %err,
443 "transient flow execution failure, backing off"
444 );
445 tokio::time::sleep(Duration::from_millis(delay)).await;
446 }
447 }
448 }
449 }
450 .instrument(span)
451 .await;
452 let status = if result.is_ok() { "ok" } else { "err" };
453 let duration_ms = started.elapsed().as_secs_f64() * 1000.0;
454 crate::metrics::record_flow_execution(&metric_tenant, &metric_flow_id, status, duration_ms);
455 result
456 }
457
458 pub async fn resume(
459 &self,
460 ctx: FlowContext<'_>,
461 snapshot: FlowSnapshot,
462 input: Value,
463 ) -> Result<FlowExecution> {
464 if snapshot.pack_id != ctx.pack_id {
465 bail!(
466 "snapshot pack {} does not match requested {}",
467 snapshot.pack_id,
468 ctx.pack_id
469 );
470 }
471 let resume_flow = snapshot
472 .next_flow
473 .clone()
474 .unwrap_or_else(|| snapshot.flow_id.clone());
475 let flow_ir = self.get_or_load_flow(ctx.pack_id, &resume_flow).await?;
476 let mut state = snapshot.state;
477 state.replace_input(input.clone());
487 state.entry = input;
488 let span = self.flow_execute_span(&ctx);
489 self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node), resume_flow)
490 .instrument(span)
491 .await
492 }
493
494 async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
495 let flow_ir = self.get_or_load_flow(ctx.pack_id, ctx.flow_id).await?;
496 let state = ExecutionState::new(input);
497 self.drive_flow(ctx, flow_ir, state, None, ctx.flow_id.to_string())
498 .await
499 }
500
501 async fn drive_flow(
502 &self,
503 ctx: &FlowContext<'_>,
504 mut flow_ir: HostFlow,
505 mut state: ExecutionState,
506 resume_from: Option<String>,
507 mut current_flow_id: String,
508 ) -> Result<FlowExecution> {
509 let mut current = match resume_from {
510 Some(node) => NodeId::from_str(&node)
511 .with_context(|| format!("invalid resume node id `{node}`"))?,
512 None => flow_ir
513 .start
514 .clone()
515 .or_else(|| flow_ir.nodes.keys().next().cloned())
516 .with_context(|| format!("flow {} has no start node", flow_ir.id))?,
517 };
518
519 loop {
520 let step_ctx = FlowContext {
521 tenant: ctx.tenant,
522 pack_id: ctx.pack_id,
523 flow_id: current_flow_id.as_str(),
524 node_id: ctx.node_id,
525 tool: ctx.tool,
526 action: ctx.action,
527 session_id: ctx.session_id,
528 provider_id: ctx.provider_id,
529 retry_config: ctx.retry_config,
530 attempt: ctx.attempt,
531 observer: ctx.observer,
532 mocks: ctx.mocks,
533 };
534 let node = flow_ir
535 .nodes
536 .get(¤t)
537 .with_context(|| format!("node {} not found", current.as_str()))?;
538
539 let payload_template = node.payload_expr.clone();
540 let prev = state
541 .last_output
542 .as_ref()
543 .cloned()
544 .unwrap_or_else(|| Value::Object(JsonMap::new()));
545 let ctx_value = template_context(&state, prev);
546 #[cfg(feature = "fault-injection")]
547 {
548 let fault_ctx = FaultContext {
549 pack_id: ctx.pack_id,
550 flow_id: ctx.flow_id,
551 node_id: Some(current.as_str()),
552 attempt: ctx.attempt,
553 };
554 maybe_fail(FaultPoint::TemplateRender, fault_ctx)
555 .map_err(|err| anyhow!(err.to_string()))?;
556 }
557 let mut payload =
558 render_template_value(&payload_template, &ctx_value, TemplateOptions::default())
559 .context("failed to render node input template")?;
560 let node_id = current.clone();
561
562 if let NodeKind::Exec { target_component } = &node.kind
567 && target_component == SLOT_EXTRACTOR_COMPONENT_ID
568 && let Some(schema) = flow_ir.slot_schema.as_ref()
569 && let Some(map) = payload.as_object_mut()
570 {
571 let input = map.entry("input").or_insert(Value::Null);
572 inject_slot_definitions(input, schema, step_ctx.flow_id, node_id.as_str());
573 }
574
575 let observed_payload = payload.clone();
576 let event = NodeEvent {
577 context: &step_ctx,
578 node_id: node_id.as_str(),
579 node,
580 payload: &observed_payload,
581 };
582 if let Some(observer) = step_ctx.observer {
583 observer.on_node_start(&event);
584 }
585 let dispatch = self
586 .dispatch_node(
587 &step_ctx,
588 node_id.as_str(),
589 node,
590 &mut state,
591 payload,
592 &event,
593 )
594 .await;
595 let DispatchOutcome { output, control } = match dispatch {
596 Ok(outcome) => outcome,
597 Err(err) => {
598 if let Some(observer) = step_ctx.observer {
599 observer.on_node_error(&event, err.as_ref());
600 }
601 return Err(err);
605 }
606 };
607
608 state.nodes.insert(node_id.clone().into(), output.clone());
609 state.last_output = Some(output.payload.clone());
610 if let Some(observer) = step_ctx.observer {
611 observer.on_node_end(&event, &output.payload);
612 }
613
614 match control {
615 NodeControl::Continue => {
616 enum NextDecision {
617 Next(NodeId),
618 End,
619 Wait,
620 }
621 let decision = match &node.routing {
622 Routing::Next { node_id } => NextDecision::Next(node_id.clone()),
623 Routing::End | Routing::Reply => NextDecision::End,
624 Routing::Branch { default, .. } => match default {
625 Some(target) => NextDecision::Next(target.clone()),
626 None => NextDecision::End,
627 },
628 Routing::Custom(raw) => {
629 match evaluate_custom_routing(raw, &output, &state, &flow_ir, &node_id)
630 {
631 CustomRoutingDecision::Next(nid) => NextDecision::Next(nid),
632 CustomRoutingDecision::End => NextDecision::End,
633 CustomRoutingDecision::Wait => NextDecision::Wait,
634 }
635 }
636 };
637
638 match decision {
639 NextDecision::Next(n) => current = n,
640 NextDecision::End => {
641 let nodes_snapshot = state.nodes.clone();
642 let final_output = state.finalize_with(Some(output.payload.clone()));
643 return Ok(FlowExecution::completed(lift_first_node_error_from_nodes(
644 final_output,
645 &nodes_snapshot,
646 )));
647 }
648 NextDecision::Wait => {
649 let mut snapshot_state = state.clone();
654 snapshot_state.clear_egress();
655 let snapshot = FlowSnapshot {
656 pack_id: step_ctx.pack_id.to_string(),
657 flow_id: step_ctx.flow_id.to_string(),
658 next_flow: (current_flow_id != step_ctx.flow_id)
659 .then_some(current_flow_id.clone()),
660 next_node: node_id.as_str().to_string(),
661 state: snapshot_state,
662 };
663 let output_value = state.finalize_with(Some(output.payload.clone()));
664 return Ok(FlowExecution::waiting(
665 output_value,
666 FlowWait {
667 reason: Some(format!(
668 "awaiting user submit at node `{}`",
669 node_id.as_str()
670 )),
671 snapshot,
672 },
673 ));
674 }
675 }
676 }
677 NodeControl::Wait { reason } => {
678 let next: Option<NodeId> = match &node.routing {
679 Routing::Next { node_id } => Some(node_id.clone()),
680 Routing::End | Routing::Reply => None,
681 Routing::Branch { default, .. } => default.clone(),
682 Routing::Custom(raw) => {
683 match evaluate_custom_routing(raw, &output, &state, &flow_ir, &node_id)
684 {
685 CustomRoutingDecision::Next(nid) => Some(nid),
686 CustomRoutingDecision::End | CustomRoutingDecision::Wait => None,
691 }
692 }
693 };
694 let resume_target = next.ok_or_else(|| {
695 anyhow!(
696 "session.wait node {} requires a non-empty route",
697 current.as_str()
698 )
699 })?;
700 let mut snapshot_state = state.clone();
701 snapshot_state.clear_egress();
702 let snapshot = FlowSnapshot {
703 pack_id: step_ctx.pack_id.to_string(),
704 flow_id: step_ctx.flow_id.to_string(),
705 next_flow: (current_flow_id != step_ctx.flow_id)
706 .then_some(current_flow_id.clone()),
707 next_node: resume_target.as_str().to_string(),
708 state: snapshot_state,
709 };
710 let output_value = state.clone().finalize_with(None);
711 return Ok(FlowExecution::waiting(
712 output_value,
713 FlowWait { reason, snapshot },
714 ));
715 }
716 NodeControl::Jump(jump) => {
717 let jump_target = self.apply_jump(&step_ctx, &mut state, jump).await?;
718 flow_ir = jump_target.flow;
719 current_flow_id = jump_target.flow_id;
720 current = jump_target.node_id;
721 }
722 NodeControl::Respond {
723 text,
724 card_cbor,
725 needs_user,
726 } => {
727 let response = json!({
728 "text": text,
729 "card_cbor": card_cbor,
730 "needs_user": needs_user,
731 });
732 state.push_egress(response);
733 let nodes_snapshot = state.nodes.clone();
734 let final_output = state.finalize_with(None);
735 return Ok(FlowExecution::completed(lift_first_node_error_from_nodes(
736 final_output,
737 &nodes_snapshot,
738 )));
739 }
740 }
741 }
742 }
743
744 async fn dispatch_node(
745 &self,
746 ctx: &FlowContext<'_>,
747 node_id: &str,
748 node: &HostNode,
749 state: &mut ExecutionState,
750 mut payload: Value,
751 event: &NodeEvent<'_>,
752 ) -> Result<DispatchOutcome> {
753 inject_card_locale(&mut payload, &state.entry);
754 match &node.kind {
755 NodeKind::Exec { target_component } => self
756 .execute_component_exec(
757 ctx,
758 node_id,
759 node,
760 payload,
761 event,
762 ComponentOverrides {
763 component: Some(target_component.as_str()),
764 operation: node.operation_name.as_deref(),
765 },
766 )
767 .await
768 .and_then(component_dispatch_outcome),
769 NodeKind::PackComponent { component_ref } => self
770 .execute_component_call(ctx, node_id, node, payload, component_ref.as_str(), event)
771 .await
772 .and_then(component_dispatch_outcome),
773 NodeKind::FlowCall => self
774 .execute_flow_call(ctx, payload)
775 .await
776 .map(DispatchOutcome::complete),
777 NodeKind::ProviderInvoke => self
778 .execute_provider_invoke(ctx, node_id, state, payload, event)
779 .await
780 .map(DispatchOutcome::complete),
781 NodeKind::BuiltinEmit { kind } => {
782 match kind {
783 EmitKind::Log | EmitKind::Response => {}
784 EmitKind::Other(component) => {
785 tracing::debug!(%component, "handling emit.* as builtin");
786 }
787 }
788 state.push_egress(payload.clone());
789 Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
790 }
791 NodeKind::BuiltinStateGet => self
792 .execute_state_get(ctx, payload)
793 .await
794 .map(DispatchOutcome::complete),
795 NodeKind::BuiltinStateSet => self
796 .execute_state_set(ctx, payload)
797 .await
798 .map(DispatchOutcome::complete),
799 NodeKind::Wait => {
800 let reason = extract_wait_reason(&payload);
801 Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
802 }
803 }
804 }
805
806 async fn execute_state_get(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
807 let key = Self::extract_state_key_helper(&payload)?;
808 let pack = self.pack_for_flow(ctx)?;
809 let store = pack
810 .state_store_handle()
811 .context("state store is not configured for this runtime")?;
812 let tenant_ctx = self.state_tenant_ctx(ctx)?;
813 let state_key = greentic_state::StateKey::new(&key);
814 let value = store
815 .get_json(
816 &tenant_ctx,
817 crate::storage::state::STATE_PREFIX,
818 &state_key,
819 None,
820 )
821 .with_context(|| format!("state.get failed for key `{key}`"))?;
822 let payload = serde_json::json!({
823 "key": key,
824 "value": value,
825 "found": value.is_some(),
826 });
827 Ok(NodeOutput::new(payload))
828 }
829
830 async fn execute_state_set(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
831 let key = Self::extract_state_key_helper(&payload)?;
832 let value = payload.get("value").cloned().unwrap_or(Value::Null);
833 let pack = self.pack_for_flow(ctx)?;
834 let store = pack
835 .state_store_handle()
836 .context("state store is not configured for this runtime")?;
837 let tenant_ctx = self.state_tenant_ctx(ctx)?;
838 let state_key = greentic_state::StateKey::new(&key);
839 store
840 .set_json(
841 &tenant_ctx,
842 crate::storage::state::STATE_PREFIX,
843 &state_key,
844 None,
845 &value,
846 None,
847 )
848 .with_context(|| format!("state.set failed for key `{key}`"))?;
849 let payload = serde_json::json!({ "key": key, "value": value });
850 Ok(NodeOutput::new(payload))
851 }
852
853 fn pack_for_flow(&self, ctx: &FlowContext<'_>) -> Result<&Arc<PackRuntime>> {
854 let key = FlowKey {
855 pack_id: ctx.pack_id.to_string(),
856 flow_id: ctx.flow_id.to_string(),
857 };
858 let idx = self.flow_sources.get(&key).with_context(|| {
859 format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
860 })?;
861 Ok(&self.packs[*idx])
862 }
863
864 fn extract_state_key_helper(payload: &Value) -> Result<String> {
865 payload
866 .get("key")
867 .and_then(Value::as_str)
868 .map(String::from)
869 .filter(|k| !k.is_empty())
870 .context("state node payload missing required `key` (non-empty string)")
871 }
872
873 fn state_tenant_ctx(&self, ctx: &FlowContext<'_>) -> Result<greentic_types::TenantCtx> {
874 let env = greentic_types::EnvId::from_str(&self.default_env)
875 .with_context(|| format!("invalid env id `{}`", self.default_env))?;
876 let tenant = greentic_types::TenantId::from_str(ctx.tenant)
877 .with_context(|| format!("invalid tenant id `{}`", ctx.tenant))?;
878 Ok(greentic_types::TenantCtx::new(env, tenant))
879 }
880
881 async fn apply_jump(
882 &self,
883 ctx: &FlowContext<'_>,
884 state: &mut ExecutionState,
885 jump: JumpControl,
886 ) -> Result<JumpTarget> {
887 let target_flow = jump.flow.trim();
888 if target_flow.is_empty() {
889 bail!("missing_flow");
890 }
891
892 let flow = self
893 .get_or_load_flow(ctx.pack_id, target_flow)
894 .await
895 .with_context(|| format!("unknown_flow:{target_flow}"))?;
896
897 let target_node = if let Some(node) = jump.node.as_deref() {
898 let parsed = NodeId::from_str(node).with_context(|| format!("unknown_node:{node}"))?;
899 if !flow.nodes.contains_key(&parsed) {
900 bail!("unknown_node:{node}");
901 }
902 parsed
903 } else {
904 flow.start
905 .clone()
906 .or_else(|| flow.nodes.keys().next().cloned())
907 .ok_or_else(|| anyhow!("jump_failed: flow {target_flow} has no start node"))?
908 };
909
910 let max_redirects = jump.max_redirects.unwrap_or(3);
911 if state.redirect_count() >= max_redirects {
912 bail!("redirect_limit");
913 }
914 state.increment_redirect_count();
915 state.replace_input(jump.payload.clone());
916 state.last_output = Some(jump.payload);
917 tracing::info!(
918 flow_id = %ctx.flow_id,
919 target_flow = %target_flow,
920 target_node = %target_node.as_str(),
921 reason = ?jump.reason,
922 redirects = state.redirect_count(),
923 "flow.jump.applied"
924 );
925
926 Ok(JumpTarget {
927 flow_id: target_flow.to_string(),
928 flow,
929 node_id: target_node,
930 })
931 }
932
933 async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
934 #[derive(Deserialize)]
935 struct FlowCallPayload {
936 #[serde(alias = "flow")]
937 flow_id: String,
938 #[serde(default)]
939 input: Value,
940 }
941
942 let call: FlowCallPayload =
943 serde_json::from_value(payload).context("invalid payload for flow.call node")?;
944 if call.flow_id.trim().is_empty() {
945 bail!("flow.call requires a non-empty flow_id");
946 }
947
948 let sub_input = if call.input.is_null() {
949 Value::Null
950 } else {
951 call.input
952 };
953
954 let flow_id_owned = call.flow_id;
955 let action = "flow.call";
956 let sub_ctx = FlowContext {
957 tenant: ctx.tenant,
958 pack_id: ctx.pack_id,
959 flow_id: flow_id_owned.as_str(),
960 node_id: None,
961 tool: ctx.tool,
962 action: Some(action),
963 session_id: ctx.session_id,
964 provider_id: ctx.provider_id,
965 retry_config: ctx.retry_config,
966 attempt: ctx.attempt,
967 observer: ctx.observer,
968 mocks: ctx.mocks,
969 };
970
971 let execution = Box::pin(self.execute(sub_ctx, sub_input))
972 .await
973 .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
974 match execution.status {
975 FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
976 FlowStatus::Waiting(wait) => bail!(
977 "flow.call cannot pause (flow {} waiting {:?})",
978 flow_id_owned,
979 wait.reason
980 ),
981 }
982 }
983
984 async fn execute_component_exec(
985 &self,
986 ctx: &FlowContext<'_>,
987 node_id: &str,
988 node: &HostNode,
989 payload: Value,
990 event: &NodeEvent<'_>,
991 overrides: ComponentOverrides<'_>,
992 ) -> Result<NodeOutput> {
993 #[derive(Deserialize)]
994 struct ComponentPayload {
995 #[serde(default, alias = "component_ref", alias = "component")]
996 component: Option<String>,
997 #[serde(alias = "op")]
998 operation: Option<String>,
999 #[serde(default)]
1000 input: Value,
1001 #[serde(default)]
1002 config: Value,
1003 }
1004
1005 let payload: ComponentPayload =
1006 serde_json::from_value(payload).context("invalid payload for component.exec")?;
1007 let component_ref = overrides
1008 .component
1009 .map(str::to_string)
1010 .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
1011 .with_context(|| "component.exec requires a component_ref")?;
1012 let operation = resolve_component_operation(
1013 node_id,
1014 node.component_id.as_str(),
1015 payload.operation,
1016 overrides.operation,
1017 node.operation_in_mapping.as_deref(),
1018 )?;
1019
1020 let call = ComponentCall {
1021 component_ref,
1022 operation,
1023 input: payload.input,
1024 config: payload.config,
1025 };
1026
1027 self.invoke_component_call(ctx, node_id, call, event).await
1028 }
1029
1030 async fn execute_component_call(
1031 &self,
1032 ctx: &FlowContext<'_>,
1033 node_id: &str,
1034 node: &HostNode,
1035 payload: Value,
1036 component_ref: &str,
1037 event: &NodeEvent<'_>,
1038 ) -> Result<NodeOutput> {
1039 let payload_operation = extract_operation_from_mapping(&payload);
1040 let (input, config) = split_operation_payload(payload);
1041 let operation = resolve_component_operation(
1042 node_id,
1043 node.component_id.as_str(),
1044 payload_operation,
1045 node.operation_name.as_deref(),
1046 node.operation_in_mapping.as_deref(),
1047 )?;
1048 let call = ComponentCall {
1049 component_ref: component_ref.to_string(),
1050 operation,
1051 input,
1052 config,
1053 };
1054 self.invoke_component_call(ctx, node_id, call, event).await
1055 }
1056
1057 async fn invoke_component_call(
1058 &self,
1059 ctx: &FlowContext<'_>,
1060 node_id: &str,
1061 mut call: ComponentCall,
1062 event: &NodeEvent<'_>,
1063 ) -> Result<NodeOutput> {
1064 self.validate_component(ctx, event, &call)?;
1065 let key = FlowKey {
1066 pack_id: ctx.pack_id.to_string(),
1067 flow_id: ctx.flow_id.to_string(),
1068 };
1069 let pack_idx = *self.flow_sources.get(&key).with_context(|| {
1070 format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
1071 })?;
1072 let pack = Arc::clone(&self.packs[pack_idx]);
1073
1074 promote_card_config_to_invocation(&mut call.input, &call.config);
1081
1082 resolve_card_assets(&mut call.input, &pack);
1086
1087 let is_card = is_card_invocation(&call.input);
1095
1096 let input_json = if is_card {
1097 serde_json::to_string(&call.input)?
1098 } else {
1099 let meta = InvocationMeta {
1101 env: &self.default_env,
1102 tenant: ctx.tenant,
1103 flow_id: ctx.flow_id,
1104 node_id: Some(node_id),
1105 provider_id: ctx.provider_id,
1106 session_id: ctx.session_id,
1107 attempt: ctx.attempt,
1108 };
1109 let invocation_envelope =
1110 build_invocation_envelope(meta, call.operation.as_str(), call.input)
1111 .context("build invocation envelope for component call")?;
1112 serde_json::to_string(&invocation_envelope)?
1113 };
1114 let config_json = if call.config.is_null() {
1115 None
1116 } else {
1117 Some(serde_json::to_string(&call.config)?)
1118 };
1119
1120 let exec_ctx = component_exec_ctx(ctx, node_id);
1121 #[cfg(feature = "fault-injection")]
1122 {
1123 let fault_ctx = FaultContext {
1124 pack_id: ctx.pack_id,
1125 flow_id: ctx.flow_id,
1126 node_id: Some(node_id),
1127 attempt: ctx.attempt,
1128 };
1129 maybe_fail(FaultPoint::BeforeComponentCall, fault_ctx)
1130 .map_err(|err| anyhow!(err.to_string()))?;
1131 }
1132 let value = pack
1133 .invoke_component(
1134 call.component_ref.as_str(),
1135 exec_ctx,
1136 call.operation.as_str(),
1137 config_json,
1138 input_json,
1139 )
1140 .await?;
1141 #[cfg(feature = "fault-injection")]
1142 {
1143 let fault_ctx = FaultContext {
1144 pack_id: ctx.pack_id,
1145 flow_id: ctx.flow_id,
1146 node_id: Some(node_id),
1147 attempt: ctx.attempt,
1148 };
1149 maybe_fail(FaultPoint::AfterComponentCall, fault_ctx)
1150 .map_err(|err| anyhow!(err.to_string()))?;
1151 }
1152
1153 if let Some((code, message)) = component_error(&value) {
1154 bail!(
1155 "component {} failed: {}: {}",
1156 call.component_ref,
1157 code,
1158 message
1159 );
1160 }
1161 if let Some((code, message)) = mcp_tool_error(&value) {
1167 bail!(
1168 "component {} returned tool error: {}: {}",
1169 call.component_ref,
1170 code,
1171 message
1172 );
1173 }
1174 Ok(NodeOutput::new(value))
1175 }
1176
1177 async fn execute_provider_invoke(
1178 &self,
1179 ctx: &FlowContext<'_>,
1180 node_id: &str,
1181 state: &ExecutionState,
1182 payload: Value,
1183 event: &NodeEvent<'_>,
1184 ) -> Result<NodeOutput> {
1185 #[derive(Deserialize)]
1186 struct ProviderPayload {
1187 #[serde(default)]
1188 provider_id: Option<String>,
1189 #[serde(default)]
1190 provider_type: Option<String>,
1191 #[serde(default, alias = "operation")]
1192 op: Option<String>,
1193 #[serde(default)]
1194 input: Value,
1195 #[serde(default)]
1196 in_map: Value,
1197 #[serde(default)]
1198 out_map: Value,
1199 #[serde(default)]
1200 err_map: Value,
1201 }
1202
1203 let payload: ProviderPayload =
1204 serde_json::from_value(payload).context("invalid payload for provider.invoke")?;
1205 let op = payload
1206 .op
1207 .as_deref()
1208 .filter(|v| !v.trim().is_empty())
1209 .with_context(|| "provider.invoke requires an op")?
1210 .to_string();
1211
1212 let prev = state
1213 .last_output
1214 .as_ref()
1215 .cloned()
1216 .unwrap_or_else(|| Value::Object(JsonMap::new()));
1217 let base_ctx = template_context(state, prev);
1218
1219 let input_value = if !payload.in_map.is_null() {
1220 let mut ctx_value = base_ctx.clone();
1221 if let Value::Object(ref mut map) = ctx_value {
1222 map.insert("input".into(), payload.input.clone());
1223 map.insert("result".into(), payload.input.clone());
1224 }
1225 render_template_value(
1226 &payload.in_map,
1227 &ctx_value,
1228 TemplateOptions {
1229 allow_pointer: true,
1230 },
1231 )
1232 .context("failed to render provider.invoke in_map")?
1233 } else if !payload.input.is_null() {
1234 payload.input
1235 } else {
1236 Value::Null
1237 };
1238 let input_json = serde_json::to_vec(&input_value)?;
1239
1240 self.validate_tool(
1241 ctx,
1242 event,
1243 payload.provider_id.as_deref(),
1244 payload.provider_type.as_deref(),
1245 &op,
1246 &input_value,
1247 )?;
1248
1249 let key = FlowKey {
1250 pack_id: ctx.pack_id.to_string(),
1251 flow_id: ctx.flow_id.to_string(),
1252 };
1253 let pack_idx = *self.flow_sources.get(&key).with_context(|| {
1254 format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
1255 })?;
1256 let pack = Arc::clone(&self.packs[pack_idx]);
1257 let binding = pack.resolve_provider(
1258 payload.provider_id.as_deref(),
1259 payload.provider_type.as_deref(),
1260 );
1261
1262 if binding.is_err()
1264 && let Some(output) = self.try_invoke_cross_pack_resolver(
1265 payload.provider_id.as_deref(),
1266 payload.provider_type.as_deref(),
1267 &op,
1268 &input_json,
1269 ctx.tenant,
1270 )?
1271 {
1272 return Ok(output);
1273 }
1274
1275 let binding = binding?;
1276 let exec_ctx = component_exec_ctx(ctx, node_id);
1277 #[cfg(feature = "fault-injection")]
1278 {
1279 let fault_ctx = FaultContext {
1280 pack_id: ctx.pack_id,
1281 flow_id: ctx.flow_id,
1282 node_id: Some(node_id),
1283 attempt: ctx.attempt,
1284 };
1285 maybe_fail(FaultPoint::BeforeToolCall, fault_ctx)
1286 .map_err(|err| anyhow!(err.to_string()))?;
1287 }
1288 let provider_metric_id = payload
1289 .provider_id
1290 .as_deref()
1291 .or(payload.provider_type.as_deref())
1292 .unwrap_or("unknown");
1293 let invoke_started = std::time::Instant::now();
1294 let invoke_result = pack
1295 .invoke_provider(&binding, exec_ctx, &op, input_json)
1296 .await;
1297 let invoke_duration_ms = invoke_started.elapsed().as_secs_f64() * 1000.0;
1298 crate::metrics::record_provider_invocation(
1299 ctx.tenant,
1300 provider_metric_id,
1301 &op,
1302 if invoke_result.is_ok() { "ok" } else { "err" },
1303 invoke_duration_ms,
1304 );
1305 let result = invoke_result?;
1306 #[cfg(feature = "fault-injection")]
1307 {
1308 let fault_ctx = FaultContext {
1309 pack_id: ctx.pack_id,
1310 flow_id: ctx.flow_id,
1311 node_id: Some(node_id),
1312 attempt: ctx.attempt,
1313 };
1314 maybe_fail(FaultPoint::AfterToolCall, fault_ctx)
1315 .map_err(|err| anyhow!(err.to_string()))?;
1316 }
1317
1318 let output = if payload.out_map.is_null() {
1319 result
1320 } else {
1321 let mut ctx_value = base_ctx;
1322 if let Value::Object(ref mut map) = ctx_value {
1323 map.insert("input".into(), result.clone());
1324 map.insert("result".into(), result.clone());
1325 }
1326 render_template_value(
1327 &payload.out_map,
1328 &ctx_value,
1329 TemplateOptions {
1330 allow_pointer: true,
1331 },
1332 )
1333 .context("failed to render provider.invoke out_map")?
1334 };
1335 let _ = payload.err_map;
1336 Ok(NodeOutput::new(output))
1337 }
1338
1339 fn try_invoke_cross_pack_resolver(
1340 &self,
1341 provider_id: Option<&str>,
1342 provider_type: Option<&str>,
1343 op: &str,
1344 input_json: &[u8],
1345 tenant: &str,
1346 ) -> Result<Option<NodeOutput>> {
1347 eprintln!(
1348 "[DEBUG] provider.invoke: pack-local failed, has_resolver={}",
1349 self.cross_pack_resolver.is_some()
1350 );
1351 let Some(resolver) = self.cross_pack_resolver.as_ref() else {
1352 return Ok(None);
1353 };
1354 let provider_id = provider_id.unwrap_or("unknown");
1355 tracing::info!(
1356 provider_id,
1357 op = %op,
1358 "provider.invoke: pack-local resolution failed, trying cross-pack resolver"
1359 );
1360 let result_value =
1361 resolver.invoke(provider_id, provider_type, op, input_json, tenant, None)?;
1362 Ok(Some(NodeOutput::new(result_value)))
1363 }
1364
1365 fn validate_component(
1366 &self,
1367 ctx: &FlowContext<'_>,
1368 event: &NodeEvent<'_>,
1369 call: &ComponentCall,
1370 ) -> Result<()> {
1371 if self.validation.mode == ValidationMode::Off {
1372 return Ok(());
1373 }
1374 let mut metadata = JsonMap::new();
1375 metadata.insert("tenant_id".to_string(), json!(ctx.tenant));
1376 if let Some(id) = ctx.session_id {
1377 metadata.insert("session".to_string(), json!({ "id": id }));
1378 }
1379 let envelope = json!({
1380 "component_id": call.component_ref,
1381 "operation": call.operation,
1382 "input": call.input,
1383 "config": call.config,
1384 "metadata": Value::Object(metadata),
1385 });
1386 let issues = validate_component_envelope(&envelope);
1387 self.report_validation(ctx, event, "component", issues)
1388 }
1389
1390 fn validate_tool(
1391 &self,
1392 ctx: &FlowContext<'_>,
1393 event: &NodeEvent<'_>,
1394 provider_id: Option<&str>,
1395 provider_type: Option<&str>,
1396 operation: &str,
1397 input: &Value,
1398 ) -> Result<()> {
1399 if self.validation.mode == ValidationMode::Off {
1400 return Ok(());
1401 }
1402 let tool_id = provider_id.or(provider_type).unwrap_or("provider.invoke");
1403 let mut metadata = JsonMap::new();
1404 metadata.insert("tenant_id".to_string(), json!(ctx.tenant));
1405 if let Some(id) = ctx.session_id {
1406 metadata.insert("session".to_string(), json!({ "id": id }));
1407 }
1408 let envelope = json!({
1409 "tool_id": tool_id,
1410 "operation": operation,
1411 "input": input,
1412 "metadata": Value::Object(metadata),
1413 });
1414 let issues = validate_tool_envelope(&envelope);
1415 self.report_validation(ctx, event, "tool", issues)
1416 }
1417
1418 fn report_validation(
1419 &self,
1420 ctx: &FlowContext<'_>,
1421 event: &NodeEvent<'_>,
1422 kind: &str,
1423 issues: Vec<ValidationIssue>,
1424 ) -> Result<()> {
1425 if issues.is_empty() {
1426 return Ok(());
1427 }
1428 if let Some(observer) = ctx.observer {
1429 observer.on_validation(event, &issues);
1430 }
1431 match self.validation.mode {
1432 ValidationMode::Warn => {
1433 tracing::warn!(
1434 tenant = ctx.tenant,
1435 flow_id = ctx.flow_id,
1436 node_id = event.node_id,
1437 kind,
1438 issues = ?issues,
1439 "invocation envelope validation issues"
1440 );
1441 Ok(())
1442 }
1443 ValidationMode::Error => {
1444 tracing::error!(
1445 tenant = ctx.tenant,
1446 flow_id = ctx.flow_id,
1447 node_id = event.node_id,
1448 kind,
1449 issues = ?issues,
1450 "invocation envelope validation failed"
1451 );
1452 bail!("invocation_validation_failed");
1453 }
1454 ValidationMode::Off => Ok(()),
1455 }
1456 }
1457
1458 pub fn flows(&self) -> &[FlowDescriptor] {
1459 &self.flows
1460 }
1461
1462 pub fn flow_by_key(&self, pack_id: &str, flow_id: &str) -> Option<&FlowDescriptor> {
1463 self.flows
1464 .iter()
1465 .find(|descriptor| descriptor.pack_id == pack_id && descriptor.id == flow_id)
1466 }
1467
1468 pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
1469 let mut matches = self
1470 .flows
1471 .iter()
1472 .filter(|descriptor| descriptor.flow_type == flow_type);
1473 let first = matches.next()?;
1474 if matches.next().is_some() {
1475 return None;
1476 }
1477 Some(first)
1478 }
1479
1480 pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
1481 let mut matches = self
1482 .flows
1483 .iter()
1484 .filter(|descriptor| descriptor.id == flow_id);
1485 let first = matches.next()?;
1486 if matches.next().is_some() {
1487 return None;
1488 }
1489 Some(first)
1490 }
1491}
1492
1493pub trait ExecutionObserver: Send + Sync {
1494 fn on_node_start(&self, event: &NodeEvent<'_>);
1495 fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
1496 fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
1497 fn on_validation(&self, _event: &NodeEvent<'_>, _issues: &[ValidationIssue]) {}
1498}
1499
1500pub struct NodeEvent<'a> {
1501 pub context: &'a FlowContext<'a>,
1502 pub node_id: &'a str,
1503 pub node: &'a HostNode,
1504 pub payload: &'a Value,
1505}
1506
1507#[derive(Clone, Debug, Serialize, Deserialize)]
1508pub struct ExecutionState {
1509 #[serde(default)]
1510 entry: Value,
1511 #[serde(default)]
1512 input: Value,
1513 #[serde(default)]
1514 nodes: HashMap<String, NodeOutput>,
1515 #[serde(default)]
1516 egress: Vec<Value>,
1517 #[serde(default, skip_serializing_if = "Option::is_none")]
1518 last_output: Option<Value>,
1519 #[serde(default)]
1520 redirect_count: u32,
1521}
1522
1523impl ExecutionState {
1524 fn new(input: Value) -> Self {
1525 Self {
1526 entry: input.clone(),
1527 input,
1528 nodes: HashMap::new(),
1529 egress: Vec::new(),
1530 last_output: None,
1531 redirect_count: 0,
1532 }
1533 }
1534
1535 #[allow(dead_code)]
1539 fn ensure_entry(&mut self) {
1540 if self.entry.is_null() {
1541 self.entry = self.input.clone();
1542 }
1543 }
1544
1545 fn context(&self) -> Value {
1546 let mut nodes = JsonMap::new();
1547 for (id, output) in &self.nodes {
1548 nodes.insert(
1549 id.clone(),
1550 json!({
1551 "ok": output.ok,
1552 "payload": output.payload.clone(),
1553 "meta": output.meta.clone(),
1554 }),
1555 );
1556 }
1557 json!({
1558 "entry": self.entry.clone(),
1559 "input": self.input.clone(),
1560 "nodes": nodes,
1561 "redirect_count": self.redirect_count,
1562 })
1563 }
1564
1565 fn outputs_map(&self) -> JsonMap<String, Value> {
1566 let mut outputs = JsonMap::new();
1567 for (id, output) in &self.nodes {
1568 outputs.insert(id.clone(), output.payload.clone());
1569 }
1570 outputs
1571 }
1572 fn push_egress(&mut self, payload: Value) {
1573 self.egress.push(payload);
1574 }
1575
1576 fn replace_input(&mut self, input: Value) {
1577 self.input = input;
1578 }
1579
1580 fn clear_egress(&mut self) {
1581 self.egress.clear();
1582 }
1583
1584 fn redirect_count(&self) -> u32 {
1585 self.redirect_count
1586 }
1587
1588 fn increment_redirect_count(&mut self) {
1589 self.redirect_count = self.redirect_count.saturating_add(1);
1590 }
1591
1592 fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
1593 if self.egress.is_empty() {
1594 return final_payload.unwrap_or(Value::Null);
1595 }
1596 let mut emitted = std::mem::take(&mut self.egress);
1597 if let Some(value) = final_payload {
1598 match value {
1599 Value::Null => {}
1600 Value::Array(items) => emitted.extend(items),
1601 other => emitted.push(other),
1602 }
1603 }
1604 Value::Array(emitted)
1605 }
1606}
1607
1608#[derive(Clone, Debug, Serialize, Deserialize)]
1609struct NodeOutput {
1610 ok: bool,
1611 payload: Value,
1612 meta: Value,
1613}
1614
1615impl NodeOutput {
1616 fn new(payload: Value) -> Self {
1617 Self {
1618 ok: true,
1619 payload,
1620 meta: Value::Null,
1621 }
1622 }
1623
1624 #[allow(dead_code)]
1628 fn with_error(node_id: &str, err: &(dyn std::error::Error + 'static)) -> Self {
1629 Self {
1630 ok: false,
1631 payload: Value::Null,
1632 meta: json!({
1633 "error": {
1634 "kind": "flow_node_failed",
1635 "message": err.to_string(),
1636 "node_id": node_id,
1637 }
1638 }),
1639 }
1640 }
1641}
1642
1643struct DispatchOutcome {
1644 output: NodeOutput,
1645 control: NodeControl,
1646}
1647
1648impl DispatchOutcome {
1649 fn complete(output: NodeOutput) -> Self {
1650 Self {
1651 output,
1652 control: NodeControl::Continue,
1653 }
1654 }
1655
1656 fn wait(output: NodeOutput, reason: Option<String>) -> Self {
1657 Self {
1658 output,
1659 control: NodeControl::Wait { reason },
1660 }
1661 }
1662
1663 fn with_control(output: NodeOutput, control: NodeControl) -> Self {
1664 Self { output, control }
1665 }
1666}
1667
1668#[derive(Clone, Debug)]
1669enum NodeControl {
1670 Continue,
1671 Wait {
1672 reason: Option<String>,
1673 },
1674 Jump(JumpControl),
1675 Respond {
1676 text: Option<String>,
1677 card_cbor: Option<Vec<u8>>,
1678 needs_user: Option<bool>,
1679 },
1680}
1681
1682#[derive(Clone, Debug)]
1683struct JumpControl {
1684 flow: String,
1685 node: Option<String>,
1686 payload: Value,
1687 hints: Value,
1688 max_redirects: Option<u32>,
1689 reason: Option<String>,
1690}
1691
1692#[derive(Clone, Debug)]
1693struct JumpTarget {
1694 flow_id: String,
1695 flow: HostFlow,
1696 node_id: NodeId,
1697}
1698
1699impl NodeOutput {
1700 fn with_meta(payload: Value, meta: Value) -> Self {
1701 Self {
1702 ok: true,
1703 payload,
1704 meta,
1705 }
1706 }
1707}
1708
1709fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
1710 ComponentExecCtx {
1711 tenant: ComponentTenantCtx {
1712 tenant: ctx.tenant.to_string(),
1713 team: None,
1714 user: ctx.provider_id.map(str::to_string),
1715 trace_id: None,
1716 i18n_id: None,
1717 correlation_id: ctx.session_id.map(str::to_string),
1718 deadline_unix_ms: None,
1719 attempt: ctx.attempt,
1720 idempotency_key: ctx.session_id.map(str::to_string),
1721 },
1722 i18n_id: None,
1723 flow_id: ctx.flow_id.to_string(),
1724 node_id: Some(node_id.to_string()),
1725 }
1726}
1727
1728fn component_error(value: &Value) -> Option<(String, String)> {
1729 let obj = value.as_object()?;
1730 let ok = obj.get("ok").and_then(Value::as_bool)?;
1731 if ok {
1732 return None;
1733 }
1734 let err = obj.get("error")?.as_object()?;
1735 let code = err
1736 .get("code")
1737 .and_then(Value::as_str)
1738 .unwrap_or("component_error");
1739 let message = err
1740 .get("message")
1741 .and_then(Value::as_str)
1742 .unwrap_or("component reported error");
1743 Some((code.to_string(), message.to_string()))
1744}
1745
1746fn mcp_tool_error(value: &Value) -> Option<(String, String)> {
1751 let obj = value.as_object()?;
1752 if obj.contains_key("result") {
1754 return None;
1755 }
1756 let err = obj.get("error")?.as_object()?;
1757 let code = err
1758 .get("code")
1759 .and_then(Value::as_str)
1760 .unwrap_or("tool_error");
1761 let raw_message = err
1762 .get("message")
1763 .and_then(Value::as_str)
1764 .unwrap_or("tool returned an error");
1765 let status = err.get("status").and_then(Value::as_u64);
1766 let message = match status {
1767 Some(s) => format!("{raw_message} (status {s})"),
1768 None => raw_message.to_string(),
1769 };
1770 Some((code.to_string(), message))
1771}
1772
1773fn extract_wait_reason(payload: &Value) -> Option<String> {
1774 match payload {
1775 Value::String(s) => Some(s.clone()),
1776 Value::Object(map) => map
1777 .get("reason")
1778 .and_then(Value::as_str)
1779 .map(|value| value.to_string()),
1780 _ => None,
1781 }
1782}
1783
1784fn component_dispatch_outcome(output: NodeOutput) -> Result<DispatchOutcome> {
1785 if let Some(control) = parse_component_control(&output.payload)? {
1786 return Ok(match control {
1787 NodeControl::Jump(jump) => {
1788 let adjusted = NodeOutput::with_meta(jump.payload.clone(), jump.hints.clone());
1789 DispatchOutcome::with_control(adjusted, NodeControl::Jump(jump))
1790 }
1791 NodeControl::Respond {
1792 text,
1793 card_cbor,
1794 needs_user,
1795 } => DispatchOutcome::with_control(
1796 output,
1797 NodeControl::Respond {
1798 text,
1799 card_cbor,
1800 needs_user,
1801 },
1802 ),
1803 other => DispatchOutcome::with_control(output, other),
1804 });
1805 }
1806 Ok(DispatchOutcome::complete(output))
1807}
1808
1809fn parse_component_control(payload: &Value) -> Result<Option<NodeControl>> {
1810 let Value::Object(map) = payload else {
1811 return Ok(None);
1812 };
1813 let Some(control_value) = map.get("greentic_control") else {
1814 return Ok(None);
1815 };
1816 let control = control_value
1817 .as_object()
1818 .ok_or_else(|| anyhow!("jump_failed: greentic_control must be an object"))?;
1819 let action = control
1820 .get("action")
1821 .and_then(Value::as_str)
1822 .ok_or_else(|| anyhow!("jump_failed: greentic_control.action is required"))?;
1823 let version = control
1824 .get("v")
1825 .and_then(Value::as_u64)
1826 .ok_or_else(|| anyhow!("jump_failed: greentic_control.v is required"))?;
1827 if version != 1 {
1828 bail!("jump_failed: unsupported greentic_control.v={version}");
1829 }
1830
1831 match action {
1832 "jump" => {
1833 let flow = control
1834 .get("flow")
1835 .and_then(Value::as_str)
1836 .map(str::trim)
1837 .filter(|value| !value.is_empty())
1838 .ok_or_else(|| anyhow!("jump_failed: jump flow is required"))?
1839 .to_string();
1840 let node = control
1841 .get("node")
1842 .and_then(Value::as_str)
1843 .map(str::trim)
1844 .filter(|value| !value.is_empty())
1845 .map(str::to_string);
1846 let payload = control.get("payload").cloned().unwrap_or(Value::Null);
1847 let hints = control.get("hints").cloned().unwrap_or(Value::Null);
1848 let max_redirects = control
1849 .get("max_redirects")
1850 .and_then(Value::as_u64)
1851 .and_then(|value| u32::try_from(value).ok());
1852 let reason = control
1853 .get("reason")
1854 .and_then(Value::as_str)
1855 .map(str::to_string);
1856 Ok(Some(NodeControl::Jump(JumpControl {
1857 flow,
1858 node,
1859 payload,
1860 hints,
1861 max_redirects,
1862 reason,
1863 })))
1864 }
1865 "respond" => {
1866 let text = control
1867 .get("text")
1868 .and_then(Value::as_str)
1869 .map(str::to_string);
1870 let card_cbor = control
1871 .get("card_cbor")
1872 .and_then(Value::as_array)
1873 .map(|bytes| {
1874 bytes
1875 .iter()
1876 .filter_map(Value::as_u64)
1877 .filter_map(|value| u8::try_from(value).ok())
1878 .collect::<Vec<_>>()
1879 });
1880 let needs_user = control.get("needs_user").and_then(Value::as_bool);
1881 Ok(Some(NodeControl::Respond {
1882 text,
1883 card_cbor,
1884 needs_user,
1885 }))
1886 }
1887 _ => Ok(None),
1888 }
1889}
1890
1891fn template_context(state: &ExecutionState, prev: Value) -> Value {
1892 let entry = if state.entry.is_null() {
1893 Value::Object(JsonMap::new())
1894 } else {
1895 state.entry.clone()
1896 };
1897 let mut ctx = JsonMap::new();
1898 ctx.insert("entry".into(), entry.clone());
1899 ctx.insert("in".into(), entry); ctx.insert("prev".into(), prev);
1901 ctx.insert("node".into(), Value::Object(state.outputs_map()));
1902 ctx.insert("state".into(), state.context());
1903 Value::Object(ctx)
1904}
1905
1906impl From<Flow> for HostFlow {
1907 fn from(value: Flow) -> Self {
1908 let mut nodes = IndexMap::new();
1909 for (id, node) in value.nodes {
1910 nodes.insert(id.clone(), HostNode::from(node));
1911 }
1912 let start = value
1913 .entrypoints
1914 .get("default")
1915 .and_then(Value::as_str)
1916 .and_then(|id| NodeId::from_str(id).ok())
1917 .or_else(|| nodes.keys().next().cloned());
1918 let slot_schema = value
1922 .metadata
1923 .extra
1924 .get(SLOT_SCHEMA_METADATA_KEY)
1925 .filter(|v| !v.is_null())
1926 .cloned();
1927 Self {
1928 id: value.id.as_str().to_string(),
1929 start,
1930 nodes,
1931 slot_schema,
1932 }
1933 }
1934}
1935
1936impl From<Node> for HostNode {
1937 fn from(node: Node) -> Self {
1938 let full_ref = node.component.id.as_str().to_string();
1939 let operation_in_mapping = extract_operation_from_mapping(&node.input.mapping);
1940 let is_builtin = full_ref.starts_with("component.exec")
1949 || full_ref.starts_with("flow.")
1950 || full_ref.starts_with("emit.")
1951 || full_ref.starts_with("session.")
1952 || full_ref.starts_with("provider.");
1953 let (component_ref, raw_operation) =
1954 if node.component.operation.is_some() || is_builtin || operation_in_mapping.is_some() {
1955 (full_ref, node.component.operation.clone())
1956 } else if let Some(dot) = full_ref.rfind('.') {
1957 let comp = full_ref[..dot].to_string();
1958 let op = full_ref[dot + 1..].to_string();
1959 (comp, Some(op))
1960 } else {
1961 (full_ref, None)
1962 };
1963 let operation_is_component_exec = raw_operation.as_deref() == Some("component.exec");
1964 let operation_is_emit = raw_operation
1965 .as_deref()
1966 .map(|op| op.starts_with("emit."))
1967 .unwrap_or(false);
1968 let is_component_exec = component_ref == "component.exec" || operation_is_component_exec;
1969
1970 let kind = if is_component_exec {
1971 let target = if component_ref == "component.exec" {
1972 if let Some(op) = raw_operation
1973 .as_deref()
1974 .filter(|op| op.starts_with("emit."))
1975 {
1976 op.to_string()
1977 } else {
1978 extract_target_component(&node.input.mapping)
1979 .unwrap_or_else(|| "component.exec".to_string())
1980 }
1981 } else {
1982 extract_target_component(&node.input.mapping)
1983 .unwrap_or_else(|| component_ref.clone())
1984 };
1985 if target.starts_with("emit.") {
1986 NodeKind::BuiltinEmit {
1987 kind: emit_kind_from_ref(&target),
1988 }
1989 } else {
1990 NodeKind::Exec {
1991 target_component: target,
1992 }
1993 }
1994 } else if operation_is_emit {
1995 NodeKind::BuiltinEmit {
1996 kind: emit_kind_from_ref(raw_operation.as_deref().unwrap_or("emit.log")),
1997 }
1998 } else {
1999 match component_ref.as_str() {
2000 "flow.call" => NodeKind::FlowCall,
2001 "provider.invoke" => NodeKind::ProviderInvoke,
2002 "session.wait" => NodeKind::Wait,
2003 "state.get" => NodeKind::BuiltinStateGet,
2004 "state.set" => NodeKind::BuiltinStateSet,
2005 comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
2006 kind: emit_kind_from_ref(comp),
2007 },
2008 other => NodeKind::PackComponent {
2009 component_ref: other.to_string(),
2010 },
2011 }
2012 };
2013 let component_label = match &kind {
2014 NodeKind::Exec { .. } => "component.exec".to_string(),
2015 NodeKind::PackComponent { component_ref } => component_ref.clone(),
2016 NodeKind::ProviderInvoke => "provider.invoke".to_string(),
2017 NodeKind::FlowCall => "flow.call".to_string(),
2018 NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
2019 NodeKind::BuiltinStateGet => "state.get".to_string(),
2020 NodeKind::BuiltinStateSet => "state.set".to_string(),
2021 NodeKind::Wait => "session.wait".to_string(),
2022 };
2023 let operation_name = if is_component_exec && operation_is_component_exec {
2024 None
2025 } else {
2026 raw_operation.clone()
2027 };
2028 let payload_expr = match kind {
2029 NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.input.mapping),
2030 _ => node.input.mapping.clone(),
2031 };
2032 Self {
2033 kind,
2034 component: component_label,
2035 component_id: if is_component_exec {
2036 "component.exec".to_string()
2037 } else {
2038 component_ref
2039 },
2040 operation_name,
2041 operation_in_mapping,
2042 payload_expr,
2043 routing: node.routing,
2044 }
2045 }
2046}
2047
2048fn extract_target_component(payload: &Value) -> Option<String> {
2049 match payload {
2050 Value::Object(map) => map
2051 .get("component")
2052 .or_else(|| map.get("component_ref"))
2053 .and_then(Value::as_str)
2054 .map(|s| s.to_string()),
2055 _ => None,
2056 }
2057}
2058
2059fn extract_operation_from_mapping(payload: &Value) -> Option<String> {
2060 match payload {
2061 Value::Object(map) => map
2062 .get("operation")
2063 .or_else(|| map.get("op"))
2064 .and_then(Value::as_str)
2065 .map(str::trim)
2066 .filter(|value| !value.is_empty())
2067 .map(|value| value.to_string()),
2068 _ => None,
2069 }
2070}
2071
2072fn extract_emit_payload(payload: &Value) -> Value {
2073 if let Value::Object(map) = payload {
2074 if let Some(input) = map.get("input") {
2075 return input.clone();
2076 }
2077 if let Some(inner) = map.get("payload") {
2078 return inner.clone();
2079 }
2080 }
2081 payload.clone()
2082}
2083
2084fn split_operation_payload(payload: Value) -> (Value, Value) {
2085 if let Value::Object(mut map) = payload.clone()
2086 && map.contains_key("input")
2087 {
2088 let input = map.remove("input").unwrap_or(Value::Null);
2089 let config = map.remove("config").unwrap_or(Value::Null);
2090 let legacy_only = map.keys().all(|key| {
2091 matches!(
2092 key.as_str(),
2093 "operation" | "op" | "component" | "component_ref"
2094 )
2095 });
2096 if legacy_only {
2097 return (input, config);
2098 }
2099 }
2100 (payload, Value::Null)
2101}
2102
2103fn resolve_component_operation(
2104 node_id: &str,
2105 component_label: &str,
2106 payload_operation: Option<String>,
2107 operation_override: Option<&str>,
2108 operation_in_mapping: Option<&str>,
2109) -> Result<String> {
2110 if let Some(op) = operation_override
2111 .map(str::trim)
2112 .filter(|value| !value.is_empty())
2113 {
2114 return Ok(op.to_string());
2115 }
2116
2117 if let Some(op) = payload_operation
2118 .as_deref()
2119 .map(str::trim)
2120 .filter(|value| !value.is_empty())
2121 {
2122 return Ok(op.to_string());
2123 }
2124
2125 let mut message = format!(
2126 "missing operation for node `{}` (component `{}`); expected node.component.operation to be set",
2127 node_id, component_label,
2128 );
2129 if let Some(found) = operation_in_mapping {
2130 message.push_str(&format!(
2131 ". Found operation in input.mapping (`{}`) but this is not used; pack compiler must preserve node.component.operation.",
2132 found
2133 ));
2134 }
2135 bail!(message);
2136}
2137
2138fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
2139 match component_ref {
2140 "emit.log" => EmitKind::Log,
2141 "emit.response" => EmitKind::Response,
2142 other => EmitKind::Other(other.to_string()),
2143 }
2144}
2145
2146fn emit_ref_from_kind(kind: &EmitKind) -> String {
2147 match kind {
2148 EmitKind::Log => "emit.log".to_string(),
2149 EmitKind::Response => "emit.response".to_string(),
2150 EmitKind::Other(other) => other.clone(),
2151 }
2152}
2153
2154fn is_card_invocation(input: &Value) -> bool {
2157 if let Value::Object(map) = input {
2158 return map.contains_key("card_source") || map.contains_key("card_spec");
2159 }
2160 false
2161}
2162
2163fn promote_card_config_to_invocation(input: &mut Value, config: &Value) {
2175 if is_card_invocation(input) {
2176 return;
2177 }
2178
2179 let cfg_map = card_defaults_source(input, config);
2180 let Some(cfg) = cfg_map else { return };
2181
2182 let default_asset = cfg
2183 .get("default_card_asset")
2184 .and_then(Value::as_str)
2185 .map(str::trim)
2186 .filter(|value| !value.is_empty())
2187 .map(str::to_string);
2188 let default_inline = cfg
2189 .get("default_card_inline")
2190 .filter(|value| value.is_object() || value.is_array())
2191 .cloned();
2192 let default_source = cfg
2193 .get("default_source")
2194 .and_then(Value::as_str)
2195 .map(str::trim)
2196 .filter(|value| !value.is_empty())
2197 .map(str::to_lowercase);
2198
2199 if default_asset.is_none() && default_inline.is_none() && default_source.is_none() {
2200 return;
2201 }
2202
2203 let card_source = default_source.unwrap_or_else(|| {
2204 if default_inline.is_some() {
2205 "inline".to_string()
2206 } else {
2207 "asset".to_string()
2208 }
2209 });
2210
2211 let mut card_spec = serde_json::Map::new();
2212 match card_source.as_str() {
2213 "asset" => {
2214 if let Some(path) = default_asset {
2215 card_spec.insert("asset_path".into(), Value::String(path));
2216 }
2217 }
2218 "inline" => {
2219 if let Some(inline) = default_inline {
2220 card_spec.insert("inline_json".into(), inline);
2221 }
2222 }
2223 _ => {}
2224 }
2225
2226 if !matches!(input, Value::Object(_)) {
2227 *input = Value::Object(serde_json::Map::new());
2228 }
2229 if let Value::Object(map) = input {
2230 map.insert("card_source".into(), Value::String(card_source));
2231 map.insert("card_spec".into(), Value::Object(card_spec));
2232 }
2233}
2234
2235fn card_defaults_source<'a>(
2240 input: &'a Value,
2241 config: &'a Value,
2242) -> Option<&'a serde_json::Map<String, Value>> {
2243 if let Value::Object(map) = config {
2244 return Some(map);
2245 }
2246 if let Value::Object(map) = input
2247 && let Some(Value::Object(nested)) = map.get("config")
2248 {
2249 return Some(nested);
2250 }
2251 None
2252}
2253
2254fn inject_card_locale(payload: &mut Value, entry: &Value) {
2255 if !is_card_invocation(payload) {
2256 return;
2257 }
2258 let Value::Object(map) = payload else { return };
2259 if map.contains_key("locale") {
2260 return;
2261 }
2262 let locale = entry
2263 .pointer("/input/metadata/locale")
2264 .or_else(|| entry.pointer("/metadata/locale"))
2265 .and_then(Value::as_str);
2266 if let Some(locale) = locale {
2267 map.insert("locale".into(), Value::String(locale.to_string()));
2268 }
2269}
2270
2271fn inject_slot_definitions(input: &mut Value, slot_schema: &Value, flow_id: &str, node_id: &str) {
2277 if input.is_null() {
2278 *input = Value::Object(serde_json::Map::new());
2279 }
2280 let Some(map) = input.as_object_mut() else {
2281 tracing::warn!(
2282 flow_id,
2283 node_id,
2284 "slot-extractor input is not an object; cannot inject slot_definitions"
2285 );
2286 return;
2287 };
2288 if map.contains_key("slot_definitions") {
2289 return;
2290 }
2291 let slot_count = slot_schema.as_array().map_or(0, Vec::len);
2292 tracing::debug!(
2293 flow_id,
2294 slot_count,
2295 "injecting flow-level slot_schema as slot_definitions into slot-extractor input"
2296 );
2297 map.insert("slot_definitions".to_string(), slot_schema.clone());
2298}
2299
2300fn resolve_card_assets(input: &mut Value, pack: &crate::pack::PackRuntime) {
2307 resolve_card_spec_asset(input, pack);
2308
2309 if let Value::Object(map) = input
2312 && let Some(Value::Object(call)) = map.get_mut("call")
2313 && let Some(payload) = call.get_mut("payload")
2314 {
2315 resolve_card_spec_asset(payload, pack);
2316 }
2317}
2318
2319fn resolve_card_spec_asset(value: &mut Value, pack: &crate::pack::PackRuntime) {
2321 let Value::Object(map) = value else { return };
2322
2323 let is_asset = map
2324 .get("card_source")
2325 .and_then(Value::as_str)
2326 .map(|s| s.eq_ignore_ascii_case("asset"))
2327 .unwrap_or(false);
2328 if !is_asset {
2329 return;
2330 }
2331
2332 let asset_path = map
2333 .get("card_spec")
2334 .and_then(|spec| spec.get("asset_path"))
2335 .and_then(Value::as_str)
2336 .map(str::to_string);
2337
2338 let Some(asset_path) = asset_path else { return };
2339
2340 match pack.read_asset(&asset_path) {
2341 Ok(bytes) => {
2342 let card_json: Value = match serde_json::from_slice(&bytes) {
2343 Ok(v) => v,
2344 Err(err) => {
2345 tracing::warn!(
2346 asset_path,
2347 %err,
2348 "failed to parse card asset as JSON; leaving as asset reference"
2349 );
2350 return;
2351 }
2352 };
2353 tracing::debug!(asset_path, "pre-resolved card asset to inline_json");
2354 map.insert("card_source".into(), Value::String("inline".into()));
2355 if let Some(Value::Object(spec)) = map.get_mut("card_spec") {
2356 spec.insert("inline_json".into(), card_json);
2357 spec.remove("asset_path");
2358 }
2359 }
2360 Err(err) => {
2361 tracing::warn!(
2362 asset_path,
2363 %err,
2364 "card asset not found in pack; leaving as asset reference"
2365 );
2366 }
2367 }
2368
2369 let configured_bundle_path = map
2376 .get("card_spec")
2377 .and_then(|spec| spec.get("i18n_bundle_path"))
2378 .and_then(Value::as_str)
2379 .map(|s| s.trim().trim_end_matches('/').to_string())
2380 .filter(|s| !s.is_empty());
2381
2382 let bundle_path = configured_bundle_path
2383 .clone()
2384 .unwrap_or_else(|| "assets/i18n".to_string());
2385
2386 let i18n_entries = load_i18n_bundle_entries(&bundle_path, |path| pack.read_asset(path));
2387
2388 if !i18n_entries.is_empty() {
2389 let locale_keys: Vec<_> = i18n_entries.keys().cloned().collect();
2390 if let Some(Value::Object(spec)) = map.get_mut("card_spec") {
2391 spec.insert("i18n_inline".into(), Value::Object(i18n_entries));
2392 if configured_bundle_path.is_some() {
2393 tracing::info!(%bundle_path, ?locale_keys, "pre-resolved i18n bundle into card_spec.i18n_inline");
2394 } else {
2395 tracing::info!(%bundle_path, ?locale_keys, "auto-discovered i18n bundle and inlined into card_spec.i18n_inline");
2396 }
2397 }
2398 }
2399}
2400
2401fn load_i18n_bundle_entries<F>(bundle_path: &str, mut read_asset: F) -> JsonMap<String, Value>
2402where
2403 F: FnMut(&str) -> Result<Vec<u8>>,
2404{
2405 let mut i18n_entries = JsonMap::new();
2406
2407 if bundle_path.ends_with(".json") {
2408 if let Ok(bytes) = read_asset(bundle_path)
2409 && let Ok(Value::Object(entries)) = serde_json::from_slice::<Value>(&bytes)
2410 {
2411 i18n_entries.insert("en".to_string(), Value::Object(entries));
2412 }
2413 return i18n_entries;
2414 }
2415
2416 let manifest_path = format!("{bundle_path}/_manifest.json");
2417 let locale_codes: Vec<String> = read_asset(&manifest_path)
2418 .ok()
2419 .and_then(|bytes| serde_json::from_slice::<Value>(&bytes).ok())
2420 .and_then(|value| {
2421 let locales = value
2422 .get("locales")
2423 .and_then(Value::as_array)
2424 .cloned()
2425 .or_else(|| value.as_array().cloned());
2426 locales.map(|items| {
2427 items
2428 .iter()
2429 .filter_map(Value::as_str)
2430 .map(String::from)
2431 .collect()
2432 })
2433 })
2434 .unwrap_or_default();
2435
2436 tracing::info!(%bundle_path, ?locale_codes, "i18n manifest discovered locales");
2437
2438 for locale in &locale_codes {
2439 let candidate = format!("{bundle_path}/{locale}.json");
2440 if let Ok(bytes) = read_asset(&candidate)
2441 && let Ok(Value::Object(entries)) = serde_json::from_slice::<Value>(&bytes)
2442 {
2443 i18n_entries.insert(locale.clone(), Value::Object(entries));
2444 }
2445 }
2446 if !i18n_entries.contains_key("en") {
2447 let en_path = format!("{bundle_path}/en.json");
2448 if let Ok(bytes) = read_asset(&en_path)
2449 && let Ok(Value::Object(entries)) = serde_json::from_slice::<Value>(&bytes)
2450 {
2451 i18n_entries.insert("en".to_string(), Value::Object(entries));
2452 }
2453 }
2454
2455 i18n_entries
2456}
2457
2458#[derive(Debug)]
2467pub(crate) enum CustomRoutingDecision {
2468 Next(NodeId),
2469 End,
2470 Wait,
2471}
2472
2473fn evaluate_custom_routing(
2486 raw: &Value,
2487 output: &NodeOutput,
2488 state: &ExecutionState,
2489 flow_ir: &HostFlow,
2490 node_id: &NodeId,
2491) -> CustomRoutingDecision {
2492 let routes = match raw.as_array() {
2493 Some(arr) => arr,
2494 None => {
2495 tracing::warn!(
2496 flow_id = %flow_ir.id,
2497 node_id = %node_id,
2498 "custom routing is not an array; terminating"
2499 );
2500 return CustomRoutingDecision::End;
2501 }
2502 };
2503
2504 let ctx = build_routing_context(output, state);
2507
2508 let mut has_condition = false;
2509 for route in routes {
2510 let condition = route.get("condition").and_then(|v| v.as_str());
2511 let to = route.get("to").and_then(|v| v.as_str());
2512
2513 if let Some(cond) = condition {
2514 has_condition = true;
2515 if evaluate_simple_condition(cond, &ctx)
2516 && let Some(target) = to
2517 && let Ok(nid) = NodeId::new(target)
2518 {
2519 tracing::debug!(
2520 flow_id = %flow_ir.id,
2521 node_id = %node_id,
2522 condition = cond,
2523 target = target,
2524 "conditional route matched"
2525 );
2526 return CustomRoutingDecision::Next(nid);
2527 }
2528 } else if let Some(target) = to
2529 && let Ok(nid) = NodeId::new(target)
2530 {
2531 tracing::debug!(
2532 flow_id = %flow_ir.id,
2533 node_id = %node_id,
2534 target = target,
2535 "default route taken"
2536 );
2537 return CustomRoutingDecision::Next(nid);
2538 }
2539 }
2540
2541 if has_condition {
2548 tracing::debug!(
2549 flow_id = %flow_ir.id,
2550 node_id = %node_id,
2551 "no conditional route matched; pausing run at current node for resume"
2552 );
2553 CustomRoutingDecision::Wait
2554 } else {
2555 tracing::warn!(
2556 flow_id = %flow_ir.id,
2557 node_id = %node_id,
2558 "no route matched and no conditions present; terminating"
2559 );
2560 CustomRoutingDecision::End
2561 }
2562}
2563
2564fn evaluate_simple_condition(condition: &str, ctx: &Value) -> bool {
2569 let (path, expected, negate) = if let Some(idx) = condition.find("==") {
2571 let path = condition[..idx].trim();
2572 let val = condition[idx + 2..].trim().trim_matches('"');
2573 (path, val, false)
2574 } else if let Some(idx) = condition.find("!=") {
2575 let path = condition[..idx].trim();
2576 let val = condition[idx + 2..].trim().trim_matches('"');
2577 (path, val, true)
2578 } else {
2579 return false;
2580 };
2581
2582 let actual = resolve_dotted_path(ctx, path);
2584 let matches = actual
2585 .as_deref()
2586 .is_some_and(|a| a.eq_ignore_ascii_case(expected));
2587 if negate { !matches } else { matches }
2588}
2589
2590fn resolve_dotted_path(value: &Value, path: &str) -> Option<String> {
2592 let parts: Vec<&str> = path.split('.').collect();
2593 let mut current = value;
2594 for part in &parts {
2595 current = current.get(part)?;
2596 }
2597 match current {
2598 Value::String(s) => Some(s.clone()),
2599 Value::Bool(b) => Some(b.to_string()),
2600 Value::Number(n) => Some(n.to_string()),
2601 _ => Some(current.to_string()),
2602 }
2603}
2604
2605fn build_routing_context(output: &NodeOutput, state: &ExecutionState) -> Value {
2623 let mut ctx = match &output.payload {
2624 Value::Object(map) => map.clone(),
2625 _ => JsonMap::new(),
2626 };
2627
2628 let entry = &state.entry;
2629 ctx.insert("entry".into(), entry.clone());
2630 ctx.insert("in".into(), entry.clone());
2631
2632 let metadata = entry
2636 .pointer("/input/metadata")
2637 .or_else(|| entry.pointer("/metadata"));
2638
2639 let mut response = JsonMap::new();
2640 if let Some(Value::Object(meta)) = metadata {
2641 for (k, v) in meta {
2642 match v {
2644 Value::String(s) => {
2645 response.insert(k.clone(), Value::String(s.clone()));
2646 }
2647 other => {
2648 response.insert(k.clone(), other.clone());
2649 }
2650 }
2651 }
2652 }
2653 if let Some(text) = entry
2655 .pointer("/input/text")
2656 .or_else(|| entry.pointer("/text"))
2657 .filter(|t| !t.is_null())
2658 {
2659 response.insert("text".into(), text.clone());
2660 }
2661 ctx.insert("response".into(), Value::Object(response));
2662
2663 Value::Object(ctx)
2664}
2665
2666#[cfg(test)]
2667mod tests {
2668 use super::*;
2669 use crate::validate::{ValidationConfig, ValidationMode};
2670 use greentic_types::{
2671 Flow, FlowComponentRef, FlowId, FlowKind, InputMapping, Node, NodeId, OutputMapping,
2672 Routing, TelemetryHints,
2673 };
2674 use serde_json::json;
2675 use std::collections::{BTreeMap, HashMap as StdHashMap};
2676 use std::str::FromStr;
2677 use std::sync::Mutex;
2678 use tokio::runtime::Runtime;
2679
2680 fn minimal_engine() -> FlowEngine {
2681 FlowEngine {
2682 packs: Vec::new(),
2683 flows: Vec::new(),
2684 flow_sources: HashMap::new(),
2685 flow_cache: RwLock::new(HashMap::new()),
2686 default_env: "local".to_string(),
2687 validation: ValidationConfig {
2688 mode: ValidationMode::Off,
2689 },
2690 cross_pack_resolver: None,
2691 rollout_ids: RolloutIds::default(),
2692 }
2693 }
2694
2695 #[test]
2696 fn templating_renders_with_partials_and_data() {
2697 let mut state = ExecutionState::new(json!({ "city": "London" }));
2698 state.nodes.insert(
2699 "forecast".to_string(),
2700 NodeOutput::new(json!({ "temp": "20C" })),
2701 );
2702
2703 let ctx = state.context();
2705 assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
2706 }
2707
2708 #[test]
2709 fn finalize_wraps_emitted_payloads() {
2710 let mut state = ExecutionState::new(json!({}));
2711 state.push_egress(json!({ "text": "first" }));
2712 state.push_egress(json!({ "text": "second" }));
2713 let result = state.finalize_with(Some(json!({ "text": "final" })));
2714 assert_eq!(
2715 result,
2716 json!([
2717 { "text": "first" },
2718 { "text": "second" },
2719 { "text": "final" }
2720 ])
2721 );
2722 }
2723
2724 #[test]
2725 fn finalize_flattens_final_array() {
2726 let mut state = ExecutionState::new(json!({}));
2727 state.push_egress(json!({ "text": "only" }));
2728 let result = state.finalize_with(Some(json!([
2729 { "text": "extra-1" },
2730 { "text": "extra-2" }
2731 ])));
2732 assert_eq!(
2733 result,
2734 json!([
2735 { "text": "only" },
2736 { "text": "extra-1" },
2737 { "text": "extra-2" }
2738 ])
2739 );
2740 }
2741
2742 #[test]
2743 fn inject_card_locale_uses_entry_metadata_without_overwriting_payload() {
2744 let mut payload = json!({
2745 "card_source": "inline",
2746 "card_spec": { "title": "Hello" }
2747 });
2748 inject_card_locale(
2749 &mut payload,
2750 &json!({"input": {"metadata": {"locale": "nl-NL"}}}),
2751 );
2752 assert_eq!(payload["locale"], json!("nl-NL"));
2753
2754 let mut existing = json!({
2755 "card_source": "inline",
2756 "card_spec": { "title": "Hello" },
2757 "locale": "en-GB"
2758 });
2759 inject_card_locale(&mut existing, &json!({"metadata": {"locale": "nl-NL"}}));
2760 assert_eq!(existing["locale"], json!("en-GB"));
2761 }
2762
2763 #[test]
2764 fn load_i18n_bundle_entries_reads_manifest_and_falls_back_to_en() {
2765 let assets = StdHashMap::from([
2766 (
2767 "cards/i18n/_manifest.json".to_string(),
2768 br#"{"locales":["de"]}"#.to_vec(),
2769 ),
2770 (
2771 "cards/i18n/de.json".to_string(),
2772 br#"{"title":"Hallo"}"#.to_vec(),
2773 ),
2774 (
2775 "cards/i18n/en.json".to_string(),
2776 br#"{"title":"Hello"}"#.to_vec(),
2777 ),
2778 ]);
2779
2780 let entries = load_i18n_bundle_entries("cards/i18n", |path| {
2781 assets
2782 .get(path)
2783 .cloned()
2784 .with_context(|| format!("missing asset {path}"))
2785 });
2786
2787 assert_eq!(entries["de"]["title"], json!("Hallo"));
2788 assert_eq!(entries["en"]["title"], json!("Hello"));
2789 }
2790
2791 #[test]
2792 fn load_i18n_bundle_entries_reads_single_file_bundle() {
2793 let entries = load_i18n_bundle_entries("cards/i18n.json", |path| {
2794 if path == "cards/i18n.json" {
2795 Ok(br#"{"title":"Hello"}"#.to_vec())
2796 } else {
2797 bail!("unexpected asset {path}");
2798 }
2799 });
2800
2801 assert_eq!(entries["en"]["title"], json!("Hello"));
2802 }
2803
2804 struct TestCrossPackResolver;
2805
2806 impl CrossPackResolver for TestCrossPackResolver {
2807 fn invoke(
2808 &self,
2809 provider_id: &str,
2810 provider_type: Option<&str>,
2811 op: &str,
2812 input: &[u8],
2813 tenant: &str,
2814 team: Option<&str>,
2815 ) -> Result<Value> {
2816 Ok(json!({
2817 "provider_id": provider_id,
2818 "provider_type": provider_type,
2819 "op": op,
2820 "tenant": tenant,
2821 "team": team,
2822 "input": serde_json::from_slice::<Value>(input)?,
2823 }))
2824 }
2825 }
2826
2827 #[test]
2828 fn cross_pack_resolver_returns_node_output_when_present() {
2829 let mut engine = minimal_engine();
2830 engine.set_cross_pack_resolver(Arc::new(TestCrossPackResolver));
2831
2832 let output = engine
2833 .try_invoke_cross_pack_resolver(
2834 Some("mail"),
2835 Some("messaging"),
2836 "send",
2837 br#"{"subject":"hello"}"#,
2838 "demo",
2839 )
2840 .expect("resolver invocation")
2841 .expect("resolver output");
2842
2843 assert_eq!(
2844 output.payload,
2845 json!({
2846 "provider_id": "mail",
2847 "provider_type": "messaging",
2848 "op": "send",
2849 "tenant": "demo",
2850 "team": null,
2851 "input": { "subject": "hello" },
2852 })
2853 );
2854 }
2855
2856 #[test]
2857 fn parse_component_control_ignores_plain_payload() {
2858 let payload = json!({
2859 "flow": "not-a-control-field",
2860 "node": "n1"
2861 });
2862 let control = parse_component_control(&payload).expect("parse control");
2863 assert!(control.is_none());
2864 }
2865
2866 #[test]
2867 fn parse_component_control_parses_jump_marker() {
2868 let payload = json!({
2869 "greentic_control": {
2870 "action": "jump",
2871 "v": 1,
2872 "flow": "flow.b",
2873 "node": "node-2",
2874 "payload": { "message": "hi" },
2875 "hints": { "k": "v" },
2876 "max_redirects": 2,
2877 "reason": "handoff"
2878 }
2879 });
2880 let control = parse_component_control(&payload)
2881 .expect("parse control")
2882 .expect("missing control");
2883 match control {
2884 NodeControl::Jump(jump) => {
2885 assert_eq!(jump.flow, "flow.b");
2886 assert_eq!(jump.node.as_deref(), Some("node-2"));
2887 assert_eq!(jump.payload, json!({ "message": "hi" }));
2888 assert_eq!(jump.hints, json!({ "k": "v" }));
2889 assert_eq!(jump.max_redirects, Some(2));
2890 assert_eq!(jump.reason.as_deref(), Some("handoff"));
2891 }
2892 other => panic!("expected jump control, got {other:?}"),
2893 }
2894 }
2895
2896 #[test]
2897 fn parse_component_control_rejects_invalid_marker() {
2898 let payload = json!({
2899 "greentic_control": "bad-shape"
2900 });
2901 let err = parse_component_control(&payload).expect_err("expected invalid marker error");
2902 assert!(err.to_string().contains("greentic_control"));
2903 }
2904
2905 #[test]
2906 fn missing_operation_reports_node_and_component() {
2907 let engine = minimal_engine();
2908 let rt = Runtime::new().unwrap();
2909 let retry_config = RetryConfig {
2910 max_attempts: 1,
2911 base_delay_ms: 1,
2912 };
2913 let ctx = FlowContext {
2914 tenant: "tenant",
2915 pack_id: "test-pack",
2916 flow_id: "flow",
2917 node_id: Some("missing-op"),
2918 tool: None,
2919 action: None,
2920 session_id: None,
2921 provider_id: None,
2922 retry_config,
2923 attempt: 1,
2924 observer: None,
2925 mocks: None,
2926 };
2927 let node = HostNode {
2928 kind: NodeKind::Exec {
2929 target_component: "qa.process".into(),
2930 },
2931 component: "component.exec".into(),
2932 component_id: "component.exec".into(),
2933 operation_name: None,
2934 operation_in_mapping: None,
2935 payload_expr: Value::Null,
2936 routing: Routing::End,
2937 };
2938 let _state = ExecutionState::new(Value::Null);
2939 let payload = json!({ "component": "qa.process" });
2940 let event = NodeEvent {
2941 context: &ctx,
2942 node_id: "missing-op",
2943 node: &node,
2944 payload: &payload,
2945 };
2946 let err = rt
2947 .block_on(engine.execute_component_exec(
2948 &ctx,
2949 "missing-op",
2950 &node,
2951 payload.clone(),
2952 &event,
2953 ComponentOverrides {
2954 component: None,
2955 operation: None,
2956 },
2957 ))
2958 .unwrap_err();
2959 let message = err.to_string();
2960 assert!(
2961 message.contains("missing operation for node `missing-op`"),
2962 "unexpected message: {message}"
2963 );
2964 assert!(
2965 message.contains("(component `component.exec`)"),
2966 "unexpected message: {message}"
2967 );
2968 }
2969
2970 #[test]
2971 fn missing_operation_mentions_mapping_hint() {
2972 let engine = minimal_engine();
2973 let rt = Runtime::new().unwrap();
2974 let retry_config = RetryConfig {
2975 max_attempts: 1,
2976 base_delay_ms: 1,
2977 };
2978 let ctx = FlowContext {
2979 tenant: "tenant",
2980 pack_id: "test-pack",
2981 flow_id: "flow",
2982 node_id: Some("missing-op-hint"),
2983 tool: None,
2984 action: None,
2985 session_id: None,
2986 provider_id: None,
2987 retry_config,
2988 attempt: 1,
2989 observer: None,
2990 mocks: None,
2991 };
2992 let node = HostNode {
2993 kind: NodeKind::Exec {
2994 target_component: "qa.process".into(),
2995 },
2996 component: "component.exec".into(),
2997 component_id: "component.exec".into(),
2998 operation_name: None,
2999 operation_in_mapping: Some("render".into()),
3000 payload_expr: Value::Null,
3001 routing: Routing::End,
3002 };
3003 let _state = ExecutionState::new(Value::Null);
3004 let payload = json!({ "component": "qa.process" });
3005 let event = NodeEvent {
3006 context: &ctx,
3007 node_id: "missing-op-hint",
3008 node: &node,
3009 payload: &payload,
3010 };
3011 let err = rt
3012 .block_on(engine.execute_component_exec(
3013 &ctx,
3014 "missing-op-hint",
3015 &node,
3016 payload.clone(),
3017 &event,
3018 ComponentOverrides {
3019 component: None,
3020 operation: None,
3021 },
3022 ))
3023 .unwrap_err();
3024 let message = err.to_string();
3025 assert!(
3026 message.contains("missing operation for node `missing-op-hint`"),
3027 "unexpected message: {message}"
3028 );
3029 assert!(
3030 message.contains("Found operation in input.mapping (`render`)"),
3031 "unexpected message: {message}"
3032 );
3033 }
3034
3035 struct CountingObserver {
3036 starts: Mutex<Vec<String>>,
3037 ends: Mutex<Vec<Value>>,
3038 }
3039
3040 impl CountingObserver {
3041 fn new() -> Self {
3042 Self {
3043 starts: Mutex::new(Vec::new()),
3044 ends: Mutex::new(Vec::new()),
3045 }
3046 }
3047 }
3048
3049 impl ExecutionObserver for CountingObserver {
3050 fn on_node_start(&self, event: &NodeEvent<'_>) {
3051 self.starts.lock().unwrap().push(event.node_id.to_string());
3052 }
3053
3054 fn on_node_end(&self, _event: &NodeEvent<'_>, output: &Value) {
3055 self.ends.lock().unwrap().push(output.clone());
3056 }
3057
3058 fn on_node_error(&self, _event: &NodeEvent<'_>, _error: &dyn StdError) {}
3059 }
3060
3061 #[test]
3062 fn emits_end_event_for_successful_node() {
3063 let node_id = NodeId::from_str("emit").unwrap();
3064 let node = Node {
3065 id: node_id.clone(),
3066 component: FlowComponentRef {
3067 id: "emit.log".parse().unwrap(),
3068 pack_alias: None,
3069 operation: None,
3070 },
3071 input: InputMapping {
3072 mapping: json!({ "message": "logged" }),
3073 },
3074 output: OutputMapping {
3075 mapping: Value::Null,
3076 },
3077 err_map: None,
3078 routing: Routing::End,
3079 telemetry: TelemetryHints::default(),
3080 };
3081 let mut nodes = indexmap::IndexMap::default();
3082 nodes.insert(node_id.clone(), node);
3083 let flow = Flow {
3084 schema_version: "1.0".into(),
3085 id: FlowId::from_str("emit.flow").unwrap(),
3086 kind: FlowKind::Messaging,
3087 entrypoints: BTreeMap::from([(
3088 "default".to_string(),
3089 Value::String(node_id.to_string()),
3090 )]),
3091 nodes,
3092 metadata: Default::default(),
3093 };
3094 let host_flow = HostFlow::from(flow);
3095
3096 let engine = FlowEngine {
3097 packs: Vec::new(),
3098 flows: Vec::new(),
3099 flow_sources: HashMap::new(),
3100 flow_cache: RwLock::new(HashMap::from([(
3101 FlowKey {
3102 pack_id: "test-pack".to_string(),
3103 flow_id: "emit.flow".to_string(),
3104 },
3105 host_flow,
3106 )])),
3107 default_env: "local".to_string(),
3108 validation: ValidationConfig {
3109 mode: ValidationMode::Off,
3110 },
3111 cross_pack_resolver: None,
3112 rollout_ids: RolloutIds::default(),
3113 };
3114 let observer = CountingObserver::new();
3115 let ctx = FlowContext {
3116 tenant: "demo",
3117 pack_id: "test-pack",
3118 flow_id: "emit.flow",
3119 node_id: None,
3120 tool: None,
3121 action: None,
3122 session_id: None,
3123 provider_id: None,
3124 retry_config: RetryConfig {
3125 max_attempts: 1,
3126 base_delay_ms: 1,
3127 },
3128 attempt: 1,
3129 observer: Some(&observer),
3130 mocks: None,
3131 };
3132
3133 let rt = Runtime::new().unwrap();
3134 let result = rt.block_on(engine.execute(ctx, Value::Null)).unwrap();
3135 assert!(matches!(result.status, FlowStatus::Completed));
3136
3137 let starts = observer.starts.lock().unwrap();
3138 let ends = observer.ends.lock().unwrap();
3139 assert_eq!(starts.len(), 1);
3140 assert_eq!(ends.len(), 1);
3141 assert_eq!(ends[0], json!({ "message": "logged" }));
3142 }
3143
3144 #[test]
3145 fn dotted_component_id_with_mapping_operation_is_not_split() {
3146 let node = Node {
3152 id: NodeId::from_str("render").unwrap(),
3153 component: FlowComponentRef {
3154 id: "ai.greentic.component-templates".parse().unwrap(),
3155 pack_alias: None,
3156 operation: None,
3157 },
3158 input: InputMapping {
3159 mapping: json!({ "operation": "handle_message", "input": "hi" }),
3160 },
3161 output: OutputMapping {
3162 mapping: Value::Null,
3163 },
3164 err_map: None,
3165 routing: Routing::End,
3166 telemetry: TelemetryHints::default(),
3167 };
3168 let host = HostNode::from(node);
3169 assert!(
3170 matches!(&host.kind, NodeKind::PackComponent { component_ref } if component_ref == "ai.greentic.component-templates"),
3171 "dotted component id must stay intact, got kind {:?}",
3172 host.kind
3173 );
3174 assert_eq!(host.component, "ai.greentic.component-templates");
3175 assert_eq!(host.operation_in_mapping(), Some("handle_message"));
3176 }
3177
3178 #[test]
3179 fn packed_component_operation_id_still_splits_without_mapping_operation() {
3180 let node = Node {
3184 id: NodeId::from_str("render").unwrap(),
3185 component: FlowComponentRef {
3186 id: "templating.handlebars".parse().unwrap(),
3187 pack_alias: None,
3188 operation: None,
3189 },
3190 input: InputMapping {
3191 mapping: json!({ "text": "hello" }),
3192 },
3193 output: OutputMapping {
3194 mapping: Value::Null,
3195 },
3196 err_map: None,
3197 routing: Routing::End,
3198 telemetry: TelemetryHints::default(),
3199 };
3200 let host = HostNode::from(node);
3201 assert!(
3202 matches!(&host.kind, NodeKind::PackComponent { component_ref } if component_ref == "templating"),
3203 "packed <component>.<operation> id must split, got kind {:?}",
3204 host.kind
3205 );
3206 assert_eq!(host.operation_name(), Some("handlebars"));
3207 }
3208
3209 fn host_flow_for_test(
3210 flow_id: &str,
3211 node_ids: &[&str],
3212 default_start: Option<&str>,
3213 ) -> HostFlow {
3214 let mut nodes = indexmap::IndexMap::default();
3215 for node_id in node_ids {
3216 let id = NodeId::from_str(node_id).unwrap();
3217 let node = Node {
3218 id: id.clone(),
3219 component: FlowComponentRef {
3220 id: "emit.log".parse().unwrap(),
3221 pack_alias: None,
3222 operation: None,
3223 },
3224 input: InputMapping {
3225 mapping: json!({ "message": node_id }),
3226 },
3227 output: OutputMapping {
3228 mapping: Value::Null,
3229 },
3230 err_map: None,
3231 routing: Routing::End,
3232 telemetry: TelemetryHints::default(),
3233 };
3234 nodes.insert(id, node);
3235 }
3236 let mut entrypoints = BTreeMap::new();
3237 if let Some(start) = default_start {
3238 entrypoints.insert("default".to_string(), Value::String(start.to_string()));
3239 }
3240 HostFlow::from(Flow {
3241 schema_version: "1.0".into(),
3242 id: FlowId::from_str(flow_id).unwrap(),
3243 kind: FlowKind::Messaging,
3244 entrypoints,
3245 nodes,
3246 metadata: Default::default(),
3247 })
3248 }
3249
3250 fn jump_test_engine() -> FlowEngine {
3251 let target_flow = host_flow_for_test("flow.target", &["node-a", "node-b"], None);
3252 FlowEngine {
3253 packs: Vec::new(),
3254 flows: Vec::new(),
3255 flow_sources: HashMap::new(),
3256 flow_cache: RwLock::new(HashMap::from([(
3257 FlowKey {
3258 pack_id: "test-pack".to_string(),
3259 flow_id: "flow.target".to_string(),
3260 },
3261 target_flow,
3262 )])),
3263 default_env: "local".to_string(),
3264 validation: ValidationConfig {
3265 mode: ValidationMode::Off,
3266 },
3267 cross_pack_resolver: None,
3268 rollout_ids: RolloutIds::default(),
3269 }
3270 }
3271
3272 fn jump_ctx<'a>(flow_id: &'a str) -> FlowContext<'a> {
3273 FlowContext {
3274 tenant: "demo",
3275 pack_id: "test-pack",
3276 flow_id,
3277 node_id: None,
3278 tool: None,
3279 action: None,
3280 session_id: None,
3281 provider_id: None,
3282 retry_config: RetryConfig {
3283 max_attempts: 1,
3284 base_delay_ms: 1,
3285 },
3286 attempt: 1,
3287 observer: None,
3288 mocks: None,
3289 }
3290 }
3291
3292 #[test]
3293 fn with_rollout_ids_binds_revision_identity() {
3294 let engine = minimal_engine().with_rollout_ids(RolloutIds {
3295 customer_id: Some("cust-acme".into()),
3296 deployment_id: Some("01JTKS".into()),
3297 bundle_id: Some("customer.support".into()),
3298 revision_id: Some("01JTKR".into()),
3299 });
3300 assert_eq!(engine.rollout_ids.revision_id.as_deref(), Some("01JTKR"));
3301 assert_eq!(engine.rollout_ids.deployment_id.as_deref(), Some("01JTKS"));
3302 assert!(minimal_engine().rollout_ids.is_empty());
3304 }
3305
3306 #[test]
3307 fn apply_jump_unknown_flow_errors() {
3308 let engine = minimal_engine();
3309 let mut state = ExecutionState::new(Value::Null);
3310 let rt = Runtime::new().unwrap();
3311 let err = rt
3312 .block_on(engine.apply_jump(
3313 &jump_ctx("flow.source"),
3314 &mut state,
3315 JumpControl {
3316 flow: "flow.missing".into(),
3317 node: None,
3318 payload: json!({ "ok": true }),
3319 hints: Value::Null,
3320 max_redirects: None,
3321 reason: None,
3322 },
3323 ))
3324 .unwrap_err();
3325 assert!(
3326 err.to_string().contains("unknown_flow"),
3327 "unexpected error: {err}"
3328 );
3329 }
3330
3331 #[test]
3332 fn apply_jump_unknown_node_errors() {
3333 let engine = jump_test_engine();
3334 let mut state = ExecutionState::new(Value::Null);
3335 let rt = Runtime::new().unwrap();
3336 let err = rt
3337 .block_on(engine.apply_jump(
3338 &jump_ctx("flow.source"),
3339 &mut state,
3340 JumpControl {
3341 flow: "flow.target".into(),
3342 node: Some("node-missing".into()),
3343 payload: json!({ "ok": true }),
3344 hints: Value::Null,
3345 max_redirects: None,
3346 reason: None,
3347 },
3348 ))
3349 .unwrap_err();
3350 assert!(
3351 err.to_string().contains("unknown_node"),
3352 "unexpected error: {err}"
3353 );
3354 }
3355
3356 #[test]
3357 fn apply_jump_uses_default_start_fallback() {
3358 let engine = jump_test_engine();
3359 let mut state = ExecutionState::new(Value::Null);
3360 let rt = Runtime::new().unwrap();
3361 let target = rt
3362 .block_on(engine.apply_jump(
3363 &jump_ctx("flow.source"),
3364 &mut state,
3365 JumpControl {
3366 flow: "flow.target".into(),
3367 node: None,
3368 payload: json!({ "k": "v" }),
3369 hints: Value::Null,
3370 max_redirects: None,
3371 reason: None,
3372 },
3373 ))
3374 .expect("jump target");
3375 assert_eq!(target.flow_id, "flow.target");
3376 assert_eq!(target.node_id.as_str(), "node-a");
3377 }
3378
3379 #[test]
3380 fn apply_jump_redirect_limit_enforced() {
3381 let engine = jump_test_engine();
3382 let mut state = ExecutionState::new(Value::Null);
3383 state.redirect_count = 3;
3384 let rt = Runtime::new().unwrap();
3385 let err = rt
3386 .block_on(engine.apply_jump(
3387 &jump_ctx("flow.source"),
3388 &mut state,
3389 JumpControl {
3390 flow: "flow.target".into(),
3391 node: None,
3392 payload: json!({ "k": "v" }),
3393 hints: Value::Null,
3394 max_redirects: Some(3),
3395 reason: None,
3396 },
3397 ))
3398 .unwrap_err();
3399 assert_eq!(err.to_string(), "redirect_limit");
3400 }
3401
3402 #[test]
3409 fn evaluate_custom_routing_waits_when_conditional_falls_through() {
3410 let raw_routing = json!([
3411 { "condition": "response.action == \"go\"", "to": "next" },
3412 { "out": true }
3413 ]);
3414 let flow_ir = HostFlow {
3415 id: "flow.test".to_string(),
3416 start: None,
3417 nodes: IndexMap::new(),
3418 slot_schema: None,
3419 };
3420 let current_node = NodeId::from_str("current").unwrap();
3421 let output = NodeOutput::new(Value::Null);
3422
3423 let mut state_empty = ExecutionState::new(json!({ "metadata": { "action": "" } }));
3425 state_empty.entry = json!({ "metadata": { "action": "" } });
3426 let decision_empty =
3427 evaluate_custom_routing(&raw_routing, &output, &state_empty, &flow_ir, ¤t_node);
3428 assert!(
3429 matches!(decision_empty, CustomRoutingDecision::Wait),
3430 "expected Wait on conditional fall-through, got {decision_empty:?}"
3431 );
3432
3433 let mut state_go = ExecutionState::new(json!({ "metadata": { "action": "go" } }));
3435 state_go.entry = json!({ "metadata": { "action": "go" } });
3436 let decision_go =
3437 evaluate_custom_routing(&raw_routing, &output, &state_go, &flow_ir, ¤t_node);
3438 match decision_go {
3439 CustomRoutingDecision::Next(nid) => assert_eq!(nid.as_str(), "next"),
3440 other => panic!("expected Next(\"next\"), got {other:?}"),
3441 }
3442 }
3443
3444 #[test]
3445 fn node_output_with_error_marks_ok_false_and_stashes_in_meta() {
3446 let err: Box<dyn std::error::Error + 'static> =
3447 Box::<dyn std::error::Error + 'static>::from("weatherapi returned 401 Unauthorized");
3448 let out = NodeOutput::with_error("call_weather", err.as_ref());
3449 assert!(!out.ok);
3450 assert_eq!(out.payload, Value::Null);
3451 assert_eq!(out.meta["error"]["kind"], "flow_node_failed");
3452 assert_eq!(out.meta["error"]["node_id"], "call_weather");
3453 assert_eq!(
3454 out.meta["error"]["message"],
3455 "weatherapi returned 401 Unauthorized"
3456 );
3457 }
3458
3459 #[test]
3460 fn lift_first_node_error_promotes_node_meta_to_output_metadata() {
3461 let mut nodes: HashMap<String, NodeOutput> = HashMap::new();
3466 let err: Box<dyn std::error::Error + 'static> =
3467 Box::<dyn std::error::Error + 'static>::from("weatherapi returned 401 Unauthorized");
3468 nodes.insert(
3469 "call_weather".to_string(),
3470 NodeOutput::with_error("call_weather", err.as_ref()),
3471 );
3472 nodes.insert(
3473 "render_current_card".to_string(),
3474 NodeOutput::new(json!({ "text": "message" })),
3475 );
3476
3477 let final_output = json!({ "text": "message" });
3478 let enriched = lift_first_node_error_from_nodes(final_output, &nodes);
3479 assert_eq!(
3480 enriched["metadata"]["error_kind"], "flow_node_failed",
3481 "first failing node's kind must be lifted"
3482 );
3483 assert_eq!(
3484 enriched["metadata"]["error_message"],
3485 "weatherapi returned 401 Unauthorized"
3486 );
3487 assert_eq!(enriched["metadata"]["node_id"], "call_weather");
3488 assert_eq!(enriched["text"], "message");
3491 }
3492
3493 #[test]
3494 fn lift_first_node_error_is_noop_when_all_nodes_ok() {
3495 let mut nodes: HashMap<String, NodeOutput> = HashMap::new();
3496 nodes.insert(
3497 "ok_node".to_string(),
3498 NodeOutput::new(json!({ "text": "all good" })),
3499 );
3500 let output = json!({ "text": "all good" });
3501 let lifted = lift_first_node_error_from_nodes(output.clone(), &nodes);
3502 assert_eq!(lifted, output);
3503 }
3504
3505 #[tokio::test]
3506 async fn execute_user_facing_flow_failure_returns_completed_with_error_envelope() {
3507 let flow_id_str = "broken.flow";
3512 let pack_id_str = "test-pack";
3513 let host_flow = host_flow_for_test(flow_id_str, &["only-node"], Some("does-not-exist"));
3514 let engine = FlowEngine {
3515 packs: Vec::new(),
3516 flows: Vec::new(),
3517 flow_sources: HashMap::new(),
3518 flow_cache: RwLock::new(HashMap::from([(
3519 FlowKey {
3520 pack_id: pack_id_str.to_string(),
3521 flow_id: flow_id_str.to_string(),
3522 },
3523 host_flow,
3524 )])),
3525 default_env: "local".to_string(),
3526 validation: ValidationConfig {
3527 mode: ValidationMode::Off,
3528 },
3529 cross_pack_resolver: None,
3530 rollout_ids: RolloutIds::default(),
3531 };
3532 let ctx = FlowContext {
3533 tenant: "demo",
3534 pack_id: pack_id_str,
3535 flow_id: flow_id_str,
3536 node_id: None,
3537 tool: None,
3538 action: None,
3539 session_id: Some("conv-1"),
3540 provider_id: None,
3541 retry_config: RetryConfig {
3542 max_attempts: 1,
3543 base_delay_ms: 1,
3544 },
3545 attempt: 1,
3546 observer: None,
3547 mocks: None,
3548 };
3549 let result = engine
3550 .execute(ctx, Value::Null)
3551 .await
3552 .expect("must not propagate Err");
3553 assert!(matches!(result.status, FlowStatus::Completed));
3554 assert_eq!(
3555 result.output["metadata"]["error_kind"],
3556 "flow_execution_failed"
3557 );
3558 let msg = result.output["metadata"]["error_message"]
3559 .as_str()
3560 .unwrap_or("");
3561 assert!(!msg.is_empty(), "error_message must be populated");
3562 assert_eq!(result.output["metadata"]["flow_id"], "broken.flow");
3563 }
3564
3565 #[test]
3566 fn mcp_tool_error_recognises_generator_error_shape() {
3567 let value = json!({
3570 "error": {
3571 "code": "tool_error",
3572 "message": "API request returned status 401",
3573 "status": 401
3574 }
3575 });
3576 let (code, message) = mcp_tool_error(&value).expect("must detect MCP error shape");
3577 assert_eq!(code, "tool_error");
3578 assert!(message.contains("API request returned status 401"));
3579 assert!(message.contains("(status 401)"));
3580 }
3581
3582 #[test]
3583 fn mcp_tool_error_skips_success_responses() {
3584 let value = json!({ "result": { "current": { "temp_c": 19.0 } } });
3586 assert!(mcp_tool_error(&value).is_none());
3587 }
3588
3589 #[test]
3590 fn mcp_tool_error_skips_non_object_and_unrelated_shapes() {
3591 assert!(mcp_tool_error(&Value::Null).is_none());
3592 assert!(mcp_tool_error(&json!({"unrelated": true})).is_none());
3593 assert!(mcp_tool_error(&json!({"error": "oops"})).is_none());
3595 }
3596
3597 #[tokio::test]
3598 async fn execute_non_user_facing_flow_failure_still_propagates() {
3599 let flow_id_str = "broken.flow";
3602 let pack_id_str = "test-pack";
3603 let host_flow = host_flow_for_test(flow_id_str, &["only-node"], Some("does-not-exist"));
3604 let engine = FlowEngine {
3605 packs: Vec::new(),
3606 flows: Vec::new(),
3607 flow_sources: HashMap::new(),
3608 flow_cache: RwLock::new(HashMap::from([(
3609 FlowKey {
3610 pack_id: pack_id_str.to_string(),
3611 flow_id: flow_id_str.to_string(),
3612 },
3613 host_flow,
3614 )])),
3615 default_env: "local".to_string(),
3616 validation: ValidationConfig {
3617 mode: ValidationMode::Off,
3618 },
3619 cross_pack_resolver: None,
3620 rollout_ids: RolloutIds::default(),
3621 };
3622 let ctx = FlowContext {
3623 tenant: "demo",
3624 pack_id: pack_id_str,
3625 flow_id: flow_id_str,
3626 node_id: None,
3627 tool: None,
3628 action: None,
3629 session_id: None,
3630 provider_id: None,
3631 retry_config: RetryConfig {
3632 max_attempts: 1,
3633 base_delay_ms: 1,
3634 },
3635 attempt: 1,
3636 observer: None,
3637 mocks: None,
3638 };
3639 let result = engine.execute(ctx, Value::Null).await;
3640 assert!(result.is_err(), "non-user-facing flow must propagate Err");
3641 }
3642
3643 #[test]
3646 fn host_flow_extracts_slot_schema_from_metadata_extra() {
3647 use greentic_types::FlowMetadata;
3648 use std::collections::BTreeSet;
3649
3650 let schema = json!([
3651 {"name": "counterparty", "slot_type": "string", "required": true},
3652 {"name": "due_date", "slot_type": "date", "required": true}
3653 ]);
3654 let flow = Flow {
3655 schema_version: "flow-v1".into(),
3656 id: FlowId::from_str("test.flow").unwrap(),
3657 kind: FlowKind::Messaging,
3658 entrypoints: BTreeMap::new(),
3659 nodes: IndexMap::default(),
3660 metadata: FlowMetadata {
3661 title: None,
3662 description: None,
3663 tags: BTreeSet::new(),
3664 extra: json!({(SLOT_SCHEMA_METADATA_KEY): schema}),
3665 },
3666 };
3667 let host = HostFlow::from(flow);
3668 assert_eq!(
3669 host.slot_schema.as_ref(),
3670 Some(&schema),
3671 "HostFlow must extract slot_schema from metadata.extra"
3672 );
3673 }
3674
3675 #[test]
3676 fn host_flow_slot_schema_is_none_when_absent() {
3677 let flow = Flow {
3678 schema_version: "flow-v1".into(),
3679 id: FlowId::from_str("test.flow").unwrap(),
3680 kind: FlowKind::Messaging,
3681 entrypoints: BTreeMap::new(),
3682 nodes: IndexMap::default(),
3683 metadata: Default::default(),
3684 };
3685 let host = HostFlow::from(flow);
3686 assert!(
3687 host.slot_schema.is_none(),
3688 "HostFlow.slot_schema must be None when metadata.extra has no greentic.slot_schema"
3689 );
3690 }
3691
3692 #[test]
3693 fn inject_slot_definitions_adds_to_object_input() {
3694 let schema = json!([
3695 {"name": "city", "slot_type": "string"}
3696 ]);
3697 let mut input = json!({"utterance": "hello"});
3698 inject_slot_definitions(&mut input, &schema, "f", "n");
3699 assert_eq!(
3700 input,
3701 json!({"utterance": "hello", "slot_definitions": schema}),
3702 "slot_definitions must be injected into existing object"
3703 );
3704 }
3705
3706 #[test]
3707 fn inject_slot_definitions_wraps_null_input() {
3708 let schema = json!([{"name": "x", "slot_type": "string"}]);
3709 let mut input = Value::Null;
3710 inject_slot_definitions(&mut input, &schema, "f", "n");
3711 assert_eq!(
3712 input,
3713 json!({"slot_definitions": schema}),
3714 "null input must become an object with slot_definitions"
3715 );
3716 }
3717
3718 #[test]
3719 fn inject_slot_definitions_preserves_explicit_inline() {
3720 let flow_schema = json!([{"name": "city", "slot_type": "string"}]);
3721 let inline_defs = json!([{"name": "country", "slot_type": "string"}]);
3722 let mut input = json!({
3723 "utterance": "hello",
3724 "slot_definitions": inline_defs
3725 });
3726 inject_slot_definitions(&mut input, &flow_schema, "f", "n");
3727 assert_eq!(
3728 input["slot_definitions"], inline_defs,
3729 "explicit inline slot_definitions must not be overwritten"
3730 );
3731 }
3732
3733 #[test]
3734 fn inject_slot_definitions_skips_non_object_input() {
3735 let schema = json!([{"name": "x", "slot_type": "string"}]);
3736 let mut input = json!("a string");
3737 inject_slot_definitions(&mut input, &schema, "f", "n");
3738 assert_eq!(
3739 input,
3740 json!("a string"),
3741 "non-object input must be left unchanged"
3742 );
3743 }
3744
3745 fn make_flow_doc_for_test(
3746 id: &str,
3747 node_name: &str,
3748 component: &str,
3749 slot_schema: Option<Value>,
3750 ) -> greentic_flow::model::FlowDoc {
3751 use greentic_flow::model::{FlowDoc, NodeDoc};
3752
3753 let mut nodes = IndexMap::new();
3754 nodes.insert(
3755 node_name.to_string(),
3756 NodeDoc {
3757 raw: {
3758 let mut m = IndexMap::new();
3759 m.insert(
3760 "component.exec".to_string(),
3761 json!({ "component": component }),
3762 );
3763 m
3764 },
3765 routing: json!([{ "out": true }]),
3766 ..Default::default()
3767 },
3768 );
3769
3770 FlowDoc {
3771 id: id.into(),
3772 title: None,
3773 description: None,
3774 flow_type: "messaging".into(),
3775 start: Some(node_name.into()),
3776 parameters: json!({}),
3777 tags: Vec::new(),
3778 schema_version: None,
3779 entrypoints: IndexMap::new(),
3780 meta: None,
3781 slot_schema,
3782 nodes,
3783 }
3784 }
3785
3786 #[test]
3792 fn compile_flow_round_trips_slot_schema_into_host_flow() {
3793 let slot_defs = json!([
3794 { "name": "counterparty", "slot_type": "string", "required": true,
3795 "pattern": ".+" },
3796 { "name": "due_date", "slot_type": "date", "required": true,
3797 "pattern": "\\d{4}-\\d{2}-\\d{2}" }
3798 ]);
3799 let doc = make_flow_doc_for_test(
3800 "slot-test",
3801 "extractor",
3802 "slot-extractor",
3803 Some(slot_defs.clone()),
3804 );
3805
3806 let flow = greentic_flow::compile_flow(doc).expect("compile_flow must succeed");
3807 assert_eq!(
3808 flow.metadata.extra.get(SLOT_SCHEMA_METADATA_KEY),
3809 Some(&slot_defs),
3810 "compile_flow must forward slot_schema into metadata.extra"
3811 );
3812
3813 let host = HostFlow::from(flow);
3814 assert_eq!(
3815 host.slot_schema.as_ref(),
3816 Some(&slot_defs),
3817 "HostFlow.slot_schema must survive the compile_flow -> HostFlow round-trip"
3818 );
3819 }
3820
3821 #[test]
3825 fn compile_flow_without_slot_schema_leaves_host_flow_none() {
3826 let doc = make_flow_doc_for_test("no-slots", "echo", "echo", None);
3827
3828 let flow = greentic_flow::compile_flow(doc).expect("compile_flow must succeed");
3829 assert!(
3830 flow.metadata.extra.get(SLOT_SCHEMA_METADATA_KEY).is_none(),
3831 "metadata.extra must not contain greentic.slot_schema when FlowDoc.slot_schema is None"
3832 );
3833
3834 let host = HostFlow::from(flow);
3835 assert!(
3836 host.slot_schema.is_none(),
3837 "HostFlow.slot_schema must be None when FlowDoc has no slot_schema"
3838 );
3839 }
3840}
3841
3842use tracing::Instrument;
3843
3844pub struct FlowContext<'a> {
3845 pub tenant: &'a str,
3846 pub pack_id: &'a str,
3847 pub flow_id: &'a str,
3848 pub node_id: Option<&'a str>,
3849 pub tool: Option<&'a str>,
3850 pub action: Option<&'a str>,
3851 pub session_id: Option<&'a str>,
3852 pub provider_id: Option<&'a str>,
3853 pub retry_config: RetryConfig,
3854 pub attempt: u32,
3855 pub observer: Option<&'a dyn ExecutionObserver>,
3856 pub mocks: Option<&'a MockLayer>,
3857}
3858
3859#[derive(Copy, Clone)]
3860pub struct RetryConfig {
3861 pub max_attempts: u32,
3862 pub base_delay_ms: u64,
3863}
3864
3865fn lift_first_node_error_from_nodes(output: Value, nodes: &HashMap<String, NodeOutput>) -> Value {
3880 let Some((node_id, failed)) = nodes.iter().find(|(_, out)| !out.ok) else {
3881 return output;
3882 };
3883 let err_meta = failed.meta.get("error");
3884 let message = err_meta
3885 .and_then(|e| e.get("message"))
3886 .and_then(|v| v.as_str())
3887 .unwrap_or("flow node failed");
3888 let kind = err_meta
3889 .and_then(|e| e.get("kind"))
3890 .and_then(|v| v.as_str())
3891 .unwrap_or("flow_node_failed");
3892
3893 let mut output = match output {
3894 Value::Object(map) => map,
3895 Value::Null => JsonMap::new(),
3896 other => {
3897 let mut wrap = JsonMap::new();
3898 wrap.insert("payload".to_string(), other);
3899 wrap
3900 }
3901 };
3902 let metadata_entry = output
3903 .entry("metadata".to_string())
3904 .or_insert_with(|| Value::Object(JsonMap::new()));
3905 let metadata_map = match metadata_entry {
3906 Value::Object(map) => map,
3907 _ => {
3908 *metadata_entry = Value::Object(JsonMap::new());
3909 metadata_entry.as_object_mut().unwrap()
3910 }
3911 };
3912 metadata_map
3913 .entry("error_kind".to_string())
3914 .or_insert(Value::String(kind.to_string()));
3915 metadata_map
3916 .entry("error_message".to_string())
3917 .or_insert(Value::String(message.to_string()));
3918 metadata_map
3919 .entry("node_id".to_string())
3920 .or_insert(Value::String(node_id.clone()));
3921 Value::Object(output)
3922}
3923
3924fn should_retry(err: &anyhow::Error) -> bool {
3925 let lower = err.to_string().to_lowercase();
3926 lower.contains("transient")
3927 || lower.contains("unavailable")
3928 || lower.contains("internal")
3929 || lower.contains("timeout")
3930}
3931
3932impl From<FlowRetryConfig> for RetryConfig {
3933 fn from(value: FlowRetryConfig) -> Self {
3934 Self {
3935 max_attempts: value.max_attempts.max(1),
3936 base_delay_ms: value.base_delay_ms.max(50),
3937 }
3938 }
3939}