1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
9use anyhow::{Context, Result, anyhow, bail};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use super::templating::{TemplateOptions, render_template_value};
18use crate::config::{FlowRetryConfig, HostConfig};
19use crate::pack::{FlowDescriptor, PackRuntime};
20use crate::runner::invocation::{InvocationMeta, build_invocation_envelope};
21use crate::telemetry::{
22 FlowSpanAttributes, RolloutIds, annotate_span, backoff_delay_ms, set_flow_context,
23};
24#[cfg(feature = "fault-injection")]
25use crate::testing::fault_injection::{FaultContext, FaultPoint, maybe_fail};
26use crate::validate::{
27 ValidationConfig, ValidationIssue, ValidationMode, validate_component_envelope,
28 validate_tool_envelope,
29};
30use greentic_flow::SLOT_SCHEMA_METADATA_KEY;
31use greentic_types::{Flow, Node, NodeId, Routing};
32
33const SLOT_EXTRACTOR_COMPONENT_ID: &str = "ai.greentic.component-slot-extractor";
37
38pub trait CrossPackResolver: Send + Sync {
45 fn invoke(
46 &self,
47 provider_id: &str,
48 provider_type: Option<&str>,
49 op: &str,
50 input: &[u8],
51 tenant: &str,
52 team: Option<&str>,
53 ) -> Result<Value>;
54}
55
56pub struct FlowEngine {
57 packs: Vec<Arc<PackRuntime>>,
58 flows: Vec<FlowDescriptor>,
59 flow_sources: HashMap<FlowKey, usize>,
60 flow_cache: RwLock<HashMap<FlowKey, HostFlow>>,
61 default_env: String,
62 validation: ValidationConfig,
63 cross_pack_resolver: Option<Arc<dyn CrossPackResolver>>,
64 rollout_ids: RolloutIds,
69}
70
71#[derive(Clone, Debug, PartialEq, Eq, Hash)]
72struct FlowKey {
73 pack_id: String,
74 flow_id: String,
75}
76
77#[derive(Clone, Debug, Serialize, Deserialize)]
78pub struct FlowSnapshot {
79 pub pack_id: String,
80 pub flow_id: String,
81 #[serde(default, skip_serializing_if = "Option::is_none")]
82 pub next_flow: Option<String>,
83 pub next_node: String,
84 pub state: ExecutionState,
85}
86
87#[derive(Clone, Debug)]
88pub struct FlowWait {
89 pub reason: Option<String>,
90 pub snapshot: FlowSnapshot,
91}
92
93#[derive(Clone, Debug)]
94pub enum FlowStatus {
95 Completed,
96 Waiting(Box<FlowWait>),
97}
98
99#[derive(Clone, Debug)]
100pub struct FlowExecution {
101 pub output: Value,
102 pub status: FlowStatus,
103}
104
105#[derive(Clone, Debug)]
106struct HostFlow {
107 id: String,
108 start: Option<NodeId>,
109 nodes: IndexMap<NodeId, HostNode>,
110 slot_schema: Option<Value>,
113}
114
115#[derive(Clone, Debug)]
116pub struct HostNode {
117 kind: NodeKind,
118 pub component: String,
120 component_id: String,
121 operation_name: Option<String>,
122 operation_in_mapping: Option<String>,
123 payload_expr: Value,
124 routing: Routing,
125}
126
127impl HostNode {
128 pub fn component_id(&self) -> &str {
129 &self.component_id
130 }
131
132 pub fn operation_name(&self) -> Option<&str> {
133 self.operation_name.as_deref()
134 }
135
136 pub fn operation_in_mapping(&self) -> Option<&str> {
137 self.operation_in_mapping.as_deref()
138 }
139}
140
141#[derive(Clone, Debug)]
142enum NodeKind {
143 Exec { target_component: String },
144 PackComponent { component_ref: String },
145 ProviderInvoke,
146 FlowCall,
147 BuiltinEmit { kind: EmitKind },
148 BuiltinStateGet,
149 BuiltinStateSet,
150 Wait,
151}
152
153#[derive(Clone, Debug)]
154enum EmitKind {
155 Log,
156 Response,
157 Other(String),
158}
159
160struct ComponentOverrides<'a> {
161 component: Option<&'a str>,
162 operation: Option<&'a str>,
163}
164
165struct ComponentCall {
166 component_ref: String,
167 operation: String,
168 input: Value,
169 config: Value,
170}
171
172impl FlowExecution {
173 fn completed(output: Value) -> Self {
174 Self {
175 output,
176 status: FlowStatus::Completed,
177 }
178 }
179
180 fn waiting(output: Value, wait: FlowWait) -> Self {
181 Self {
182 output,
183 status: FlowStatus::Waiting(Box::new(wait)),
184 }
185 }
186}
187
188impl FlowEngine {
189 pub async fn new(packs: Vec<Arc<PackRuntime>>, config: Arc<HostConfig>) -> Result<Self> {
190 let mut flow_sources: HashMap<FlowKey, usize> = HashMap::new();
191 let mut descriptors = Vec::new();
192 let mut bindings = HashMap::new();
193 for pack in &config.pack_bindings {
194 bindings.insert(pack.pack_id.clone(), pack.flows.clone());
195 }
196 let enforce_bindings = !bindings.is_empty();
197 for (idx, pack) in packs.iter().enumerate() {
198 let pack_id = pack.metadata().pack_id.clone();
199 if enforce_bindings && !bindings.contains_key(&pack_id) {
200 bail!("no gtbind entries found for pack {}", pack_id);
201 }
202 let flows = pack.list_flows().await?;
203 let allowed = bindings.get(&pack_id).map(|flows| {
204 flows
205 .iter()
206 .cloned()
207 .collect::<std::collections::HashSet<_>>()
208 });
209 let mut seen = std::collections::HashSet::new();
210 for flow in flows {
211 if let Some(ref allow) = allowed
212 && !allow.contains(&flow.id)
213 {
214 continue;
215 }
216 seen.insert(flow.id.clone());
217 tracing::info!(
218 flow_id = %flow.id,
219 flow_type = %flow.flow_type,
220 pack_id = %flow.pack_id,
221 pack_index = idx,
222 "registered flow"
223 );
224 if let Ok(flow_ir) = pack.load_flow(&flow.id) {
225 for node in flow_ir.nodes.values() {
226 config
227 .secrets_policy
228 .register_flow_secret_refs(&node.input.mapping);
229 config
230 .secrets_policy
231 .register_flow_secret_refs(&node.output.mapping);
232 }
233 }
234 flow_sources.insert(
235 FlowKey {
236 pack_id: flow.pack_id.clone(),
237 flow_id: flow.id.clone(),
238 },
239 idx,
240 );
241 descriptors.retain(|existing: &FlowDescriptor| {
242 !(existing.id == flow.id && existing.pack_id == flow.pack_id)
243 });
244 descriptors.push(flow);
245 }
246 if let Some(allow) = allowed {
247 let missing = allow.difference(&seen).cloned().collect::<Vec<_>>();
248 if !missing.is_empty() {
249 bail!(
250 "gtbind flow ids missing in pack {}: {}",
251 pack_id,
252 missing.join(", ")
253 );
254 }
255 }
256 }
257
258 let mut flow_map = HashMap::new();
259 for flow in &descriptors {
260 let pack_id = flow.pack_id.clone();
261 if let Some(&pack_idx) = flow_sources.get(&FlowKey {
262 pack_id: pack_id.clone(),
263 flow_id: flow.id.clone(),
264 }) {
265 let pack_clone = Arc::clone(&packs[pack_idx]);
266 let flow_id = flow.id.clone();
267 let task_flow_id = flow_id.clone();
268 match task::spawn_blocking(move || pack_clone.load_flow(&task_flow_id)).await {
269 Ok(Ok(loaded_flow)) => {
270 flow_map.insert(
271 FlowKey {
272 pack_id: pack_id.clone(),
273 flow_id,
274 },
275 HostFlow::from(loaded_flow),
276 );
277 }
278 Ok(Err(err)) => {
279 tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
280 }
281 Err(err) => {
282 tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
283 }
284 }
285 }
286 }
287
288 Ok(Self {
289 packs,
290 flows: descriptors,
291 flow_sources,
292 flow_cache: RwLock::new(flow_map),
293 default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
294 validation: config.validation.clone(),
295 cross_pack_resolver: None,
296 rollout_ids: RolloutIds::default(),
297 })
298 }
299
300 pub fn with_rollout_ids(mut self, rollout_ids: RolloutIds) -> Self {
306 self.rollout_ids = rollout_ids;
307 self
308 }
309
310 pub fn rollout_ids(&self) -> &RolloutIds {
314 &self.rollout_ids
315 }
316
317 pub fn set_cross_pack_resolver(&mut self, resolver: Arc<dyn CrossPackResolver>) {
320 self.cross_pack_resolver = Some(resolver);
321 }
322
323 async fn get_or_load_flow(&self, pack_id: &str, flow_id: &str) -> Result<HostFlow> {
324 let key = FlowKey {
325 pack_id: pack_id.to_string(),
326 flow_id: flow_id.to_string(),
327 };
328 if let Some(flow) = self.flow_cache.read().get(&key).cloned() {
329 return Ok(flow);
330 }
331
332 let pack_idx = *self
333 .flow_sources
334 .get(&key)
335 .with_context(|| format!("flow {pack_id}:{flow_id} not registered"))?;
336 let pack = Arc::clone(&self.packs[pack_idx]);
337 let flow_id_owned = flow_id.to_string();
338 let task_flow_id = flow_id_owned.clone();
339 let flow = task::spawn_blocking(move || pack.load_flow(&task_flow_id))
340 .await
341 .context("failed to join flow metadata task")??;
342 let host_flow = HostFlow::from(flow);
343 self.flow_cache.write().insert(
344 FlowKey {
345 pack_id: pack_id.to_string(),
346 flow_id: flow_id_owned.clone(),
347 },
348 host_flow.clone(),
349 );
350 Ok(host_flow)
351 }
352
353 fn flow_execute_span(&self, ctx: &FlowContext<'_>) -> tracing::Span {
360 let span = tracing::info_span!(
361 "flow.execute",
362 tenant = tracing::field::Empty,
363 flow_id = tracing::field::Empty,
364 node_id = tracing::field::Empty,
365 tool = tracing::field::Empty,
366 action = tracing::field::Empty
367 );
368 annotate_span(
369 &span,
370 &FlowSpanAttributes {
371 tenant: ctx.tenant,
372 flow_id: ctx.flow_id,
373 node_id: ctx.node_id,
374 tool: ctx.tool,
375 action: ctx.action,
376 },
377 );
378 set_flow_context(
379 &span,
380 &self.default_env,
381 ctx.tenant,
382 ctx.flow_id,
383 ctx.node_id,
384 ctx.provider_id,
385 ctx.session_id,
386 ctx.pack_id,
387 &self.rollout_ids,
388 );
389 span
390 }
391
392 pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
393 let span = self.flow_execute_span(&ctx);
394 let retry_config = ctx.retry_config;
395 let original_input = input;
396 let mut ctx = ctx;
397 let metric_tenant = ctx.tenant.to_string();
398 let metric_flow_id = ctx.flow_id.to_string();
399 let started = std::time::Instant::now();
400 let result = async move {
401 let mut attempt = 0u32;
402 loop {
403 attempt += 1;
404 ctx.attempt = attempt;
405 #[cfg(feature = "fault-injection")]
406 {
407 let fault_ctx = FaultContext {
408 pack_id: ctx.pack_id,
409 flow_id: ctx.flow_id,
410 node_id: ctx.node_id,
411 attempt: ctx.attempt,
412 };
413 maybe_fail(FaultPoint::Timeout, fault_ctx)
414 .map_err(|err| anyhow!(err.to_string()))?;
415 }
416 match self.execute_once(&ctx, original_input.clone()).await {
417 Ok(value) => return Ok(value),
418 Err(err) => {
419 if attempt >= retry_config.max_attempts || !should_retry(&err) {
420 if ctx.session_id.is_some() {
425 return Ok(FlowExecution::completed(json!({
426 "metadata": {
427 "error_kind": "flow_execution_failed",
428 "error_message": err.to_string(),
429 "flow_id": ctx.flow_id,
430 }
431 })));
432 }
433 return Err(err);
434 }
435 let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
436 tracing::warn!(
437 tenant = ctx.tenant,
438 flow_id = ctx.flow_id,
439 attempt,
440 max_attempts = retry_config.max_attempts,
441 delay_ms = delay,
442 error = %err,
443 "transient flow execution failure, backing off"
444 );
445 tokio::time::sleep(Duration::from_millis(delay)).await;
446 }
447 }
448 }
449 }
450 .instrument(span)
451 .await;
452 let status = if result.is_ok() { "ok" } else { "err" };
453 let duration_ms = started.elapsed().as_secs_f64() * 1000.0;
454 crate::metrics::record_flow_execution(&metric_tenant, &metric_flow_id, status, duration_ms);
455 result
456 }
457
458 pub async fn resume(
459 &self,
460 ctx: FlowContext<'_>,
461 snapshot: FlowSnapshot,
462 input: Value,
463 ) -> Result<FlowExecution> {
464 if snapshot.pack_id != ctx.pack_id {
465 bail!(
466 "snapshot pack {} does not match requested {}",
467 snapshot.pack_id,
468 ctx.pack_id
469 );
470 }
471 let resume_flow = snapshot
472 .next_flow
473 .clone()
474 .unwrap_or_else(|| snapshot.flow_id.clone());
475 let flow_ir = self.get_or_load_flow(ctx.pack_id, &resume_flow).await?;
476 let mut state = snapshot.state;
477 state.replace_input(input.clone());
487 state.entry = input;
488 let span = self.flow_execute_span(&ctx);
489 self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node), resume_flow)
490 .instrument(span)
491 .await
492 }
493
494 async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
495 let flow_ir = self.get_or_load_flow(ctx.pack_id, ctx.flow_id).await?;
496 let state = ExecutionState::new(input);
497 self.drive_flow(ctx, flow_ir, state, None, ctx.flow_id.to_string())
498 .await
499 }
500
501 async fn drive_flow(
502 &self,
503 ctx: &FlowContext<'_>,
504 mut flow_ir: HostFlow,
505 mut state: ExecutionState,
506 resume_from: Option<String>,
507 mut current_flow_id: String,
508 ) -> Result<FlowExecution> {
509 let mut current = match resume_from {
510 Some(node) => NodeId::from_str(&node)
511 .with_context(|| format!("invalid resume node id `{node}`"))?,
512 None => flow_ir
513 .start
514 .clone()
515 .or_else(|| flow_ir.nodes.keys().next().cloned())
516 .with_context(|| format!("flow {} has no start node", flow_ir.id))?,
517 };
518
519 loop {
520 let step_ctx = FlowContext {
521 tenant: ctx.tenant,
522 pack_id: ctx.pack_id,
523 flow_id: current_flow_id.as_str(),
524 node_id: ctx.node_id,
525 tool: ctx.tool,
526 action: ctx.action,
527 session_id: ctx.session_id,
528 provider_id: ctx.provider_id,
529 retry_config: ctx.retry_config,
530 attempt: ctx.attempt,
531 observer: ctx.observer,
532 mocks: ctx.mocks,
533 };
534 let node = flow_ir
535 .nodes
536 .get(¤t)
537 .with_context(|| format!("node {} not found", current.as_str()))?;
538
539 let payload_template = node.payload_expr.clone();
540 let prev = state
541 .last_output
542 .as_ref()
543 .cloned()
544 .unwrap_or_else(|| Value::Object(JsonMap::new()));
545 let ctx_value = template_context(&state, prev);
546 #[cfg(feature = "fault-injection")]
547 {
548 let fault_ctx = FaultContext {
549 pack_id: ctx.pack_id,
550 flow_id: ctx.flow_id,
551 node_id: Some(current.as_str()),
552 attempt: ctx.attempt,
553 };
554 maybe_fail(FaultPoint::TemplateRender, fault_ctx)
555 .map_err(|err| anyhow!(err.to_string()))?;
556 }
557 let mut payload =
558 render_template_value(&payload_template, &ctx_value, TemplateOptions::default())
559 .context("failed to render node input template")?;
560 let node_id = current.clone();
561
562 if let NodeKind::Exec { target_component } = &node.kind
567 && target_component == SLOT_EXTRACTOR_COMPONENT_ID
568 && let Some(schema) = flow_ir.slot_schema.as_ref()
569 && let Some(map) = payload.as_object_mut()
570 {
571 let input = map.entry("input").or_insert(Value::Null);
572 inject_slot_definitions(input, schema, step_ctx.flow_id, node_id.as_str());
573 }
574
575 let observed_payload = payload.clone();
576 let event = NodeEvent {
577 context: &step_ctx,
578 node_id: node_id.as_str(),
579 node,
580 payload: &observed_payload,
581 };
582 if let Some(observer) = step_ctx.observer {
583 observer.on_node_start(&event);
584 }
585 let dispatch = self
586 .dispatch_node(
587 &step_ctx,
588 node_id.as_str(),
589 node,
590 &mut state,
591 payload,
592 &event,
593 )
594 .await;
595 let DispatchOutcome { output, control } = match dispatch {
596 Ok(outcome) => outcome,
597 Err(err) => {
598 if let Some(observer) = step_ctx.observer {
599 observer.on_node_error(&event, err.as_ref());
600 }
601 return Err(err);
605 }
606 };
607
608 state.nodes.insert(node_id.clone().into(), output.clone());
609 state.last_output = Some(output.payload.clone());
610 if let Some(observer) = step_ctx.observer {
611 observer.on_node_end(&event, &output.payload);
612 }
613
614 match control {
615 NodeControl::Continue => {
616 enum NextDecision {
617 Next(NodeId),
618 End,
619 Wait,
620 }
621 let decision = match &node.routing {
622 Routing::Next { node_id } => NextDecision::Next(node_id.clone()),
623 Routing::End | Routing::Reply => NextDecision::End,
624 Routing::Branch { default, .. } => match default {
625 Some(target) => NextDecision::Next(target.clone()),
626 None => NextDecision::End,
627 },
628 Routing::Custom(raw) => {
629 match evaluate_custom_routing(raw, &output, &state, &flow_ir, &node_id)
630 {
631 CustomRoutingDecision::Next(nid) => NextDecision::Next(nid),
632 CustomRoutingDecision::End => NextDecision::End,
633 CustomRoutingDecision::Wait => NextDecision::Wait,
634 }
635 }
636 };
637
638 match decision {
639 NextDecision::Next(n) => current = n,
640 NextDecision::End => {
641 let nodes_snapshot = state.nodes.clone();
642 let final_output = state.finalize_with(Some(output.payload.clone()));
643 return Ok(FlowExecution::completed(lift_first_node_error_from_nodes(
644 final_output,
645 &nodes_snapshot,
646 )));
647 }
648 NextDecision::Wait => {
649 let mut snapshot_state = state.clone();
654 snapshot_state.clear_egress();
655 let snapshot = FlowSnapshot {
656 pack_id: step_ctx.pack_id.to_string(),
657 flow_id: step_ctx.flow_id.to_string(),
658 next_flow: (current_flow_id != step_ctx.flow_id)
659 .then_some(current_flow_id.clone()),
660 next_node: node_id.as_str().to_string(),
661 state: snapshot_state,
662 };
663 let output_value = state.finalize_with(Some(output.payload.clone()));
664 return Ok(FlowExecution::waiting(
665 output_value,
666 FlowWait {
667 reason: Some(format!(
668 "awaiting user submit at node `{}`",
669 node_id.as_str()
670 )),
671 snapshot,
672 },
673 ));
674 }
675 }
676 }
677 NodeControl::Wait { reason } => {
678 let next: Option<NodeId> = match &node.routing {
679 Routing::Next { node_id } => Some(node_id.clone()),
680 Routing::End | Routing::Reply => None,
681 Routing::Branch { default, .. } => default.clone(),
682 Routing::Custom(raw) => {
683 match evaluate_custom_routing(raw, &output, &state, &flow_ir, &node_id)
684 {
685 CustomRoutingDecision::Next(nid) => Some(nid),
686 CustomRoutingDecision::End | CustomRoutingDecision::Wait => None,
691 }
692 }
693 };
694 let resume_target = next.ok_or_else(|| {
695 anyhow!(
696 "session.wait node {} requires a non-empty route",
697 current.as_str()
698 )
699 })?;
700 let mut snapshot_state = state.clone();
701 snapshot_state.clear_egress();
702 let snapshot = FlowSnapshot {
703 pack_id: step_ctx.pack_id.to_string(),
704 flow_id: step_ctx.flow_id.to_string(),
705 next_flow: (current_flow_id != step_ctx.flow_id)
706 .then_some(current_flow_id.clone()),
707 next_node: resume_target.as_str().to_string(),
708 state: snapshot_state,
709 };
710 let output_value = state.clone().finalize_with(None);
711 return Ok(FlowExecution::waiting(
712 output_value,
713 FlowWait { reason, snapshot },
714 ));
715 }
716 NodeControl::Jump(jump) => {
717 let jump_target = self.apply_jump(&step_ctx, &mut state, jump).await?;
718 flow_ir = jump_target.flow;
719 current_flow_id = jump_target.flow_id;
720 current = jump_target.node_id;
721 }
722 NodeControl::Respond {
723 text,
724 card_cbor,
725 needs_user,
726 } => {
727 let response = json!({
728 "text": text,
729 "card_cbor": card_cbor,
730 "needs_user": needs_user,
731 });
732 state.push_egress(response);
733 let nodes_snapshot = state.nodes.clone();
734 let final_output = state.finalize_with(None);
735 return Ok(FlowExecution::completed(lift_first_node_error_from_nodes(
736 final_output,
737 &nodes_snapshot,
738 )));
739 }
740 }
741 }
742 }
743
744 async fn dispatch_node(
745 &self,
746 ctx: &FlowContext<'_>,
747 node_id: &str,
748 node: &HostNode,
749 state: &mut ExecutionState,
750 mut payload: Value,
751 event: &NodeEvent<'_>,
752 ) -> Result<DispatchOutcome> {
753 inject_card_locale(&mut payload, &state.entry);
754 match &node.kind {
755 NodeKind::Exec { target_component } => self
756 .execute_component_exec(
757 ctx,
758 node_id,
759 node,
760 payload,
761 event,
762 ComponentOverrides {
763 component: Some(target_component.as_str()),
764 operation: node.operation_name.as_deref(),
765 },
766 )
767 .await
768 .and_then(component_dispatch_outcome),
769 NodeKind::PackComponent { component_ref } => self
770 .execute_component_call(ctx, node_id, node, payload, component_ref.as_str(), event)
771 .await
772 .and_then(component_dispatch_outcome),
773 NodeKind::FlowCall => self
774 .execute_flow_call(ctx, payload)
775 .await
776 .map(DispatchOutcome::complete),
777 NodeKind::ProviderInvoke => self
778 .execute_provider_invoke(ctx, node_id, state, payload, event)
779 .await
780 .map(DispatchOutcome::complete),
781 NodeKind::BuiltinEmit { kind } => {
782 match kind {
783 EmitKind::Log | EmitKind::Response => {}
784 EmitKind::Other(component) => {
785 tracing::debug!(%component, "handling emit.* as builtin");
786 }
787 }
788 state.push_egress(payload.clone());
789 Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
790 }
791 NodeKind::BuiltinStateGet => self
792 .execute_state_get(ctx, payload)
793 .await
794 .map(DispatchOutcome::complete),
795 NodeKind::BuiltinStateSet => self
796 .execute_state_set(ctx, payload)
797 .await
798 .map(DispatchOutcome::complete),
799 NodeKind::Wait => {
800 let reason = extract_wait_reason(&payload);
801 Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
802 }
803 }
804 }
805
806 async fn execute_state_get(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
807 let key = Self::extract_state_key_helper(&payload)?;
808 let pack = self.pack_for_flow(ctx)?;
809 let store = pack
810 .state_store_handle()
811 .context("state store is not configured for this runtime")?;
812 let tenant_ctx = self.state_tenant_ctx(ctx)?;
813 let state_key = greentic_state::StateKey::new(&key);
814 let value = store
815 .get_json(
816 &tenant_ctx,
817 crate::storage::state::STATE_PREFIX,
818 &state_key,
819 None,
820 )
821 .with_context(|| format!("state.get failed for key `{key}`"))?;
822 let payload = serde_json::json!({
823 "key": key,
824 "value": value,
825 "found": value.is_some(),
826 });
827 Ok(NodeOutput::new(payload))
828 }
829
830 async fn execute_state_set(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
831 let key = Self::extract_state_key_helper(&payload)?;
832 let value = payload.get("value").cloned().unwrap_or(Value::Null);
833 let pack = self.pack_for_flow(ctx)?;
834 let store = pack
835 .state_store_handle()
836 .context("state store is not configured for this runtime")?;
837 let tenant_ctx = self.state_tenant_ctx(ctx)?;
838 let state_key = greentic_state::StateKey::new(&key);
839 store
840 .set_json(
841 &tenant_ctx,
842 crate::storage::state::STATE_PREFIX,
843 &state_key,
844 None,
845 &value,
846 None,
847 )
848 .with_context(|| format!("state.set failed for key `{key}`"))?;
849 let payload = serde_json::json!({ "key": key, "value": value });
850 Ok(NodeOutput::new(payload))
851 }
852
853 fn pack_for_flow(&self, ctx: &FlowContext<'_>) -> Result<&Arc<PackRuntime>> {
854 let key = FlowKey {
855 pack_id: ctx.pack_id.to_string(),
856 flow_id: ctx.flow_id.to_string(),
857 };
858 let idx = self.flow_sources.get(&key).with_context(|| {
859 format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
860 })?;
861 Ok(&self.packs[*idx])
862 }
863
864 fn extract_state_key_helper(payload: &Value) -> Result<String> {
865 payload
866 .get("key")
867 .and_then(Value::as_str)
868 .map(String::from)
869 .filter(|k| !k.is_empty())
870 .context("state node payload missing required `key` (non-empty string)")
871 }
872
873 fn state_tenant_ctx(&self, ctx: &FlowContext<'_>) -> Result<greentic_types::TenantCtx> {
874 let env = greentic_types::EnvId::from_str(&self.default_env)
875 .with_context(|| format!("invalid env id `{}`", self.default_env))?;
876 let tenant = greentic_types::TenantId::from_str(ctx.tenant)
877 .with_context(|| format!("invalid tenant id `{}`", ctx.tenant))?;
878 Ok(greentic_types::TenantCtx::new(env, tenant))
879 }
880
881 async fn apply_jump(
882 &self,
883 ctx: &FlowContext<'_>,
884 state: &mut ExecutionState,
885 jump: JumpControl,
886 ) -> Result<JumpTarget> {
887 let target_flow = jump.flow.trim();
888 if target_flow.is_empty() {
889 bail!("missing_flow");
890 }
891
892 let flow = self
893 .get_or_load_flow(ctx.pack_id, target_flow)
894 .await
895 .with_context(|| format!("unknown_flow:{target_flow}"))?;
896
897 let target_node = if let Some(node) = jump.node.as_deref() {
898 let parsed = NodeId::from_str(node).with_context(|| format!("unknown_node:{node}"))?;
899 if !flow.nodes.contains_key(&parsed) {
900 bail!("unknown_node:{node}");
901 }
902 parsed
903 } else {
904 flow.start
905 .clone()
906 .or_else(|| flow.nodes.keys().next().cloned())
907 .ok_or_else(|| anyhow!("jump_failed: flow {target_flow} has no start node"))?
908 };
909
910 let max_redirects = jump.max_redirects.unwrap_or(3);
911 if state.redirect_count() >= max_redirects {
912 bail!("redirect_limit");
913 }
914 state.increment_redirect_count();
915 state.replace_input(jump.payload.clone());
916 state.last_output = Some(jump.payload);
917 tracing::info!(
918 flow_id = %ctx.flow_id,
919 target_flow = %target_flow,
920 target_node = %target_node.as_str(),
921 reason = ?jump.reason,
922 redirects = state.redirect_count(),
923 "flow.jump.applied"
924 );
925
926 Ok(JumpTarget {
927 flow_id: target_flow.to_string(),
928 flow,
929 node_id: target_node,
930 })
931 }
932
933 async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
934 #[derive(Deserialize)]
935 struct FlowCallPayload {
936 #[serde(alias = "flow")]
937 flow_id: String,
938 #[serde(default)]
939 input: Value,
940 }
941
942 let call: FlowCallPayload =
943 serde_json::from_value(payload).context("invalid payload for flow.call node")?;
944 if call.flow_id.trim().is_empty() {
945 bail!("flow.call requires a non-empty flow_id");
946 }
947
948 let sub_input = if call.input.is_null() {
949 Value::Null
950 } else {
951 call.input
952 };
953
954 let flow_id_owned = call.flow_id;
955 let action = "flow.call";
956 let sub_ctx = FlowContext {
957 tenant: ctx.tenant,
958 pack_id: ctx.pack_id,
959 flow_id: flow_id_owned.as_str(),
960 node_id: None,
961 tool: ctx.tool,
962 action: Some(action),
963 session_id: ctx.session_id,
964 provider_id: ctx.provider_id,
965 retry_config: ctx.retry_config,
966 attempt: ctx.attempt,
967 observer: ctx.observer,
968 mocks: ctx.mocks,
969 };
970
971 let execution = Box::pin(self.execute(sub_ctx, sub_input))
972 .await
973 .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
974 match execution.status {
975 FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
976 FlowStatus::Waiting(wait) => bail!(
977 "flow.call cannot pause (flow {} waiting {:?})",
978 flow_id_owned,
979 wait.reason
980 ),
981 }
982 }
983
984 async fn execute_component_exec(
985 &self,
986 ctx: &FlowContext<'_>,
987 node_id: &str,
988 node: &HostNode,
989 payload: Value,
990 event: &NodeEvent<'_>,
991 overrides: ComponentOverrides<'_>,
992 ) -> Result<NodeOutput> {
993 #[derive(Deserialize)]
994 struct ComponentPayload {
995 #[serde(default, alias = "component_ref", alias = "component")]
996 component: Option<String>,
997 #[serde(alias = "op")]
998 operation: Option<String>,
999 #[serde(default)]
1000 input: Value,
1001 #[serde(default)]
1002 config: Value,
1003 }
1004
1005 let payload: ComponentPayload =
1006 serde_json::from_value(payload).context("invalid payload for component.exec")?;
1007 let component_ref = overrides
1008 .component
1009 .map(str::to_string)
1010 .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
1011 .with_context(|| "component.exec requires a component_ref")?;
1012 let operation = resolve_component_operation(
1013 node_id,
1014 node.component_id.as_str(),
1015 payload.operation,
1016 overrides.operation,
1017 node.operation_in_mapping.as_deref(),
1018 )?;
1019
1020 let call = ComponentCall {
1021 component_ref,
1022 operation,
1023 input: payload.input,
1024 config: payload.config,
1025 };
1026
1027 self.invoke_component_call(ctx, node_id, call, event).await
1028 }
1029
1030 async fn execute_component_call(
1031 &self,
1032 ctx: &FlowContext<'_>,
1033 node_id: &str,
1034 node: &HostNode,
1035 payload: Value,
1036 component_ref: &str,
1037 event: &NodeEvent<'_>,
1038 ) -> Result<NodeOutput> {
1039 let payload_operation = extract_operation_from_mapping(&payload);
1040 let (input, config) = split_operation_payload(payload);
1041 let operation = resolve_component_operation(
1042 node_id,
1043 node.component_id.as_str(),
1044 payload_operation,
1045 node.operation_name.as_deref(),
1046 node.operation_in_mapping.as_deref(),
1047 )?;
1048 let call = ComponentCall {
1049 component_ref: component_ref.to_string(),
1050 operation,
1051 input,
1052 config,
1053 };
1054 self.invoke_component_call(ctx, node_id, call, event).await
1055 }
1056
1057 async fn invoke_component_call(
1058 &self,
1059 ctx: &FlowContext<'_>,
1060 node_id: &str,
1061 mut call: ComponentCall,
1062 event: &NodeEvent<'_>,
1063 ) -> Result<NodeOutput> {
1064 self.validate_component(ctx, event, &call)?;
1065 let key = FlowKey {
1066 pack_id: ctx.pack_id.to_string(),
1067 flow_id: ctx.flow_id.to_string(),
1068 };
1069 let pack_idx = *self.flow_sources.get(&key).with_context(|| {
1070 format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
1071 })?;
1072 let pack = Arc::clone(&self.packs[pack_idx]);
1073
1074 promote_card_config_to_invocation(&mut call.input, &call.config);
1081
1082 resolve_card_assets(&mut call.input, &pack);
1086
1087 let is_card = is_card_invocation(&call.input);
1095
1096 let input_json = if is_card {
1097 serde_json::to_string(&call.input)?
1098 } else {
1099 let meta = InvocationMeta {
1101 env: &self.default_env,
1102 tenant: ctx.tenant,
1103 flow_id: ctx.flow_id,
1104 node_id: Some(node_id),
1105 provider_id: ctx.provider_id,
1106 session_id: ctx.session_id,
1107 attempt: ctx.attempt,
1108 };
1109 let invocation_envelope =
1110 build_invocation_envelope(meta, call.operation.as_str(), call.input)
1111 .context("build invocation envelope for component call")?;
1112 serde_json::to_string(&invocation_envelope)?
1113 };
1114 let config_json = if call.config.is_null() {
1115 None
1116 } else {
1117 Some(serde_json::to_string(&call.config)?)
1118 };
1119
1120 let exec_ctx = component_exec_ctx(ctx, node_id);
1121 #[cfg(feature = "fault-injection")]
1122 {
1123 let fault_ctx = FaultContext {
1124 pack_id: ctx.pack_id,
1125 flow_id: ctx.flow_id,
1126 node_id: Some(node_id),
1127 attempt: ctx.attempt,
1128 };
1129 maybe_fail(FaultPoint::BeforeComponentCall, fault_ctx)
1130 .map_err(|err| anyhow!(err.to_string()))?;
1131 }
1132 let value = pack
1133 .invoke_component(
1134 call.component_ref.as_str(),
1135 exec_ctx,
1136 call.operation.as_str(),
1137 config_json,
1138 input_json,
1139 )
1140 .await?;
1141 #[cfg(feature = "fault-injection")]
1142 {
1143 let fault_ctx = FaultContext {
1144 pack_id: ctx.pack_id,
1145 flow_id: ctx.flow_id,
1146 node_id: Some(node_id),
1147 attempt: ctx.attempt,
1148 };
1149 maybe_fail(FaultPoint::AfterComponentCall, fault_ctx)
1150 .map_err(|err| anyhow!(err.to_string()))?;
1151 }
1152
1153 if let Some((code, message)) = component_error(&value) {
1154 bail!(
1155 "component {} failed: {}: {}",
1156 call.component_ref,
1157 code,
1158 message
1159 );
1160 }
1161 if let Some((code, message)) = mcp_tool_error(&value) {
1167 bail!(
1168 "component {} returned tool error: {}: {}",
1169 call.component_ref,
1170 code,
1171 message
1172 );
1173 }
1174 Ok(NodeOutput::new(value))
1175 }
1176
1177 async fn execute_provider_invoke(
1178 &self,
1179 ctx: &FlowContext<'_>,
1180 node_id: &str,
1181 state: &ExecutionState,
1182 payload: Value,
1183 event: &NodeEvent<'_>,
1184 ) -> Result<NodeOutput> {
1185 #[derive(Deserialize)]
1186 struct ProviderPayload {
1187 #[serde(default)]
1188 provider_id: Option<String>,
1189 #[serde(default)]
1190 provider_type: Option<String>,
1191 #[serde(default, alias = "operation")]
1192 op: Option<String>,
1193 #[serde(default)]
1194 input: Value,
1195 #[serde(default)]
1196 in_map: Value,
1197 #[serde(default)]
1198 out_map: Value,
1199 #[serde(default)]
1200 err_map: Value,
1201 }
1202
1203 let payload: ProviderPayload =
1204 serde_json::from_value(payload).context("invalid payload for provider.invoke")?;
1205 let op = payload
1206 .op
1207 .as_deref()
1208 .filter(|v| !v.trim().is_empty())
1209 .with_context(|| "provider.invoke requires an op")?
1210 .to_string();
1211
1212 let prev = state
1213 .last_output
1214 .as_ref()
1215 .cloned()
1216 .unwrap_or_else(|| Value::Object(JsonMap::new()));
1217 let base_ctx = template_context(state, prev);
1218
1219 let input_value = if !payload.in_map.is_null() {
1220 let mut ctx_value = base_ctx.clone();
1221 if let Value::Object(ref mut map) = ctx_value {
1222 map.insert("input".into(), payload.input.clone());
1223 map.insert("result".into(), payload.input.clone());
1224 }
1225 render_template_value(
1226 &payload.in_map,
1227 &ctx_value,
1228 TemplateOptions {
1229 allow_pointer: true,
1230 },
1231 )
1232 .context("failed to render provider.invoke in_map")?
1233 } else if !payload.input.is_null() {
1234 payload.input
1235 } else {
1236 Value::Null
1237 };
1238 let input_json = serde_json::to_vec(&input_value)?;
1239
1240 self.validate_tool(
1241 ctx,
1242 event,
1243 payload.provider_id.as_deref(),
1244 payload.provider_type.as_deref(),
1245 &op,
1246 &input_value,
1247 )?;
1248
1249 let key = FlowKey {
1250 pack_id: ctx.pack_id.to_string(),
1251 flow_id: ctx.flow_id.to_string(),
1252 };
1253 let pack_idx = *self.flow_sources.get(&key).with_context(|| {
1254 format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
1255 })?;
1256 let pack = Arc::clone(&self.packs[pack_idx]);
1257 let binding = pack.resolve_provider(
1258 payload.provider_id.as_deref(),
1259 payload.provider_type.as_deref(),
1260 );
1261
1262 if binding.is_err()
1264 && let Some(output) = self.try_invoke_cross_pack_resolver(
1265 payload.provider_id.as_deref(),
1266 payload.provider_type.as_deref(),
1267 &op,
1268 &input_json,
1269 ctx.tenant,
1270 )?
1271 {
1272 return Ok(output);
1273 }
1274
1275 let binding = binding?;
1276 let exec_ctx = component_exec_ctx(ctx, node_id);
1277 #[cfg(feature = "fault-injection")]
1278 {
1279 let fault_ctx = FaultContext {
1280 pack_id: ctx.pack_id,
1281 flow_id: ctx.flow_id,
1282 node_id: Some(node_id),
1283 attempt: ctx.attempt,
1284 };
1285 maybe_fail(FaultPoint::BeforeToolCall, fault_ctx)
1286 .map_err(|err| anyhow!(err.to_string()))?;
1287 }
1288 let provider_metric_id = payload
1289 .provider_id
1290 .as_deref()
1291 .or(payload.provider_type.as_deref())
1292 .unwrap_or("unknown");
1293 let invoke_started = std::time::Instant::now();
1294 let invoke_result = pack
1295 .invoke_provider(&binding, exec_ctx, &op, input_json)
1296 .await;
1297 let invoke_duration_ms = invoke_started.elapsed().as_secs_f64() * 1000.0;
1298 crate::metrics::record_provider_invocation(
1299 ctx.tenant,
1300 provider_metric_id,
1301 &op,
1302 if invoke_result.is_ok() { "ok" } else { "err" },
1303 invoke_duration_ms,
1304 );
1305 let result = invoke_result?;
1306 #[cfg(feature = "fault-injection")]
1307 {
1308 let fault_ctx = FaultContext {
1309 pack_id: ctx.pack_id,
1310 flow_id: ctx.flow_id,
1311 node_id: Some(node_id),
1312 attempt: ctx.attempt,
1313 };
1314 maybe_fail(FaultPoint::AfterToolCall, fault_ctx)
1315 .map_err(|err| anyhow!(err.to_string()))?;
1316 }
1317
1318 let output = if payload.out_map.is_null() {
1319 result
1320 } else {
1321 let mut ctx_value = base_ctx;
1322 if let Value::Object(ref mut map) = ctx_value {
1323 map.insert("input".into(), result.clone());
1324 map.insert("result".into(), result.clone());
1325 }
1326 render_template_value(
1327 &payload.out_map,
1328 &ctx_value,
1329 TemplateOptions {
1330 allow_pointer: true,
1331 },
1332 )
1333 .context("failed to render provider.invoke out_map")?
1334 };
1335 let _ = payload.err_map;
1336 Ok(NodeOutput::new(output))
1337 }
1338
1339 fn try_invoke_cross_pack_resolver(
1340 &self,
1341 provider_id: Option<&str>,
1342 provider_type: Option<&str>,
1343 op: &str,
1344 input_json: &[u8],
1345 tenant: &str,
1346 ) -> Result<Option<NodeOutput>> {
1347 eprintln!(
1348 "[DEBUG] provider.invoke: pack-local failed, has_resolver={}",
1349 self.cross_pack_resolver.is_some()
1350 );
1351 let Some(resolver) = self.cross_pack_resolver.as_ref() else {
1352 return Ok(None);
1353 };
1354 let provider_id = provider_id.unwrap_or("unknown");
1355 tracing::info!(
1356 provider_id,
1357 op = %op,
1358 "provider.invoke: pack-local resolution failed, trying cross-pack resolver"
1359 );
1360 let result_value =
1361 resolver.invoke(provider_id, provider_type, op, input_json, tenant, None)?;
1362 Ok(Some(NodeOutput::new(result_value)))
1363 }
1364
1365 fn validate_component(
1366 &self,
1367 ctx: &FlowContext<'_>,
1368 event: &NodeEvent<'_>,
1369 call: &ComponentCall,
1370 ) -> Result<()> {
1371 if self.validation.mode == ValidationMode::Off {
1372 return Ok(());
1373 }
1374 let mut metadata = JsonMap::new();
1375 metadata.insert("tenant_id".to_string(), json!(ctx.tenant));
1376 if let Some(id) = ctx.session_id {
1377 metadata.insert("session".to_string(), json!({ "id": id }));
1378 }
1379 let envelope = json!({
1380 "component_id": call.component_ref,
1381 "operation": call.operation,
1382 "input": call.input,
1383 "config": call.config,
1384 "metadata": Value::Object(metadata),
1385 });
1386 let issues = validate_component_envelope(&envelope);
1387 self.report_validation(ctx, event, "component", issues)
1388 }
1389
1390 fn validate_tool(
1391 &self,
1392 ctx: &FlowContext<'_>,
1393 event: &NodeEvent<'_>,
1394 provider_id: Option<&str>,
1395 provider_type: Option<&str>,
1396 operation: &str,
1397 input: &Value,
1398 ) -> Result<()> {
1399 if self.validation.mode == ValidationMode::Off {
1400 return Ok(());
1401 }
1402 let tool_id = provider_id.or(provider_type).unwrap_or("provider.invoke");
1403 let mut metadata = JsonMap::new();
1404 metadata.insert("tenant_id".to_string(), json!(ctx.tenant));
1405 if let Some(id) = ctx.session_id {
1406 metadata.insert("session".to_string(), json!({ "id": id }));
1407 }
1408 let envelope = json!({
1409 "tool_id": tool_id,
1410 "operation": operation,
1411 "input": input,
1412 "metadata": Value::Object(metadata),
1413 });
1414 let issues = validate_tool_envelope(&envelope);
1415 self.report_validation(ctx, event, "tool", issues)
1416 }
1417
1418 fn report_validation(
1419 &self,
1420 ctx: &FlowContext<'_>,
1421 event: &NodeEvent<'_>,
1422 kind: &str,
1423 issues: Vec<ValidationIssue>,
1424 ) -> Result<()> {
1425 if issues.is_empty() {
1426 return Ok(());
1427 }
1428 if let Some(observer) = ctx.observer {
1429 observer.on_validation(event, &issues);
1430 }
1431 match self.validation.mode {
1432 ValidationMode::Warn => {
1433 tracing::warn!(
1434 tenant = ctx.tenant,
1435 flow_id = ctx.flow_id,
1436 node_id = event.node_id,
1437 kind,
1438 issues = ?issues,
1439 "invocation envelope validation issues"
1440 );
1441 Ok(())
1442 }
1443 ValidationMode::Error => {
1444 tracing::error!(
1445 tenant = ctx.tenant,
1446 flow_id = ctx.flow_id,
1447 node_id = event.node_id,
1448 kind,
1449 issues = ?issues,
1450 "invocation envelope validation failed"
1451 );
1452 bail!("invocation_validation_failed");
1453 }
1454 ValidationMode::Off => Ok(()),
1455 }
1456 }
1457
1458 pub fn flows(&self) -> &[FlowDescriptor] {
1459 &self.flows
1460 }
1461
1462 pub fn flow_by_key(&self, pack_id: &str, flow_id: &str) -> Option<&FlowDescriptor> {
1463 self.flows
1464 .iter()
1465 .find(|descriptor| descriptor.pack_id == pack_id && descriptor.id == flow_id)
1466 }
1467
1468 pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
1469 let mut matches = self
1470 .flows
1471 .iter()
1472 .filter(|descriptor| descriptor.flow_type == flow_type);
1473 let first = matches.next()?;
1474 if matches.next().is_some() {
1475 return None;
1476 }
1477 Some(first)
1478 }
1479
1480 pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
1481 let mut matches = self
1482 .flows
1483 .iter()
1484 .filter(|descriptor| descriptor.id == flow_id);
1485 let first = matches.next()?;
1486 if matches.next().is_some() {
1487 return None;
1488 }
1489 Some(first)
1490 }
1491}
1492
1493pub trait ExecutionObserver: Send + Sync {
1494 fn on_node_start(&self, event: &NodeEvent<'_>);
1495 fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
1496 fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
1497 fn on_validation(&self, _event: &NodeEvent<'_>, _issues: &[ValidationIssue]) {}
1498}
1499
1500pub struct NodeEvent<'a> {
1501 pub context: &'a FlowContext<'a>,
1502 pub node_id: &'a str,
1503 pub node: &'a HostNode,
1504 pub payload: &'a Value,
1505}
1506
1507#[derive(Clone, Debug, Serialize, Deserialize)]
1508pub struct ExecutionState {
1509 #[serde(default)]
1510 entry: Value,
1511 #[serde(default)]
1512 input: Value,
1513 #[serde(default)]
1514 nodes: HashMap<String, NodeOutput>,
1515 #[serde(default)]
1516 egress: Vec<Value>,
1517 #[serde(default, skip_serializing_if = "Option::is_none")]
1518 last_output: Option<Value>,
1519 #[serde(default)]
1520 redirect_count: u32,
1521}
1522
1523impl ExecutionState {
1524 fn new(input: Value) -> Self {
1525 Self {
1526 entry: input.clone(),
1527 input,
1528 nodes: HashMap::new(),
1529 egress: Vec::new(),
1530 last_output: None,
1531 redirect_count: 0,
1532 }
1533 }
1534
1535 #[allow(dead_code)]
1539 fn ensure_entry(&mut self) {
1540 if self.entry.is_null() {
1541 self.entry = self.input.clone();
1542 }
1543 }
1544
1545 fn context(&self) -> Value {
1546 let mut nodes = JsonMap::new();
1547 for (id, output) in &self.nodes {
1548 nodes.insert(
1549 id.clone(),
1550 json!({
1551 "ok": output.ok,
1552 "payload": output.payload.clone(),
1553 "meta": output.meta.clone(),
1554 }),
1555 );
1556 }
1557 json!({
1558 "entry": self.entry.clone(),
1559 "input": self.input.clone(),
1560 "nodes": nodes,
1561 "redirect_count": self.redirect_count,
1562 })
1563 }
1564
1565 fn outputs_map(&self) -> JsonMap<String, Value> {
1566 let mut outputs = JsonMap::new();
1567 for (id, output) in &self.nodes {
1568 outputs.insert(id.clone(), output.payload.clone());
1569 }
1570 outputs
1571 }
1572 fn push_egress(&mut self, payload: Value) {
1573 self.egress.push(payload);
1574 }
1575
1576 fn replace_input(&mut self, input: Value) {
1577 self.input = input;
1578 }
1579
1580 fn clear_egress(&mut self) {
1581 self.egress.clear();
1582 }
1583
1584 fn redirect_count(&self) -> u32 {
1585 self.redirect_count
1586 }
1587
1588 fn increment_redirect_count(&mut self) {
1589 self.redirect_count = self.redirect_count.saturating_add(1);
1590 }
1591
1592 fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
1593 if self.egress.is_empty() {
1594 return final_payload.unwrap_or(Value::Null);
1595 }
1596 let mut emitted = std::mem::take(&mut self.egress);
1597 if let Some(value) = final_payload {
1598 match value {
1599 Value::Null => {}
1600 Value::Array(items) => emitted.extend(items),
1601 other => emitted.push(other),
1602 }
1603 }
1604 Value::Array(emitted)
1605 }
1606}
1607
1608#[derive(Clone, Debug, Serialize, Deserialize)]
1609struct NodeOutput {
1610 ok: bool,
1611 payload: Value,
1612 meta: Value,
1613}
1614
1615impl NodeOutput {
1616 fn new(payload: Value) -> Self {
1617 Self {
1618 ok: true,
1619 payload,
1620 meta: Value::Null,
1621 }
1622 }
1623
1624 #[allow(dead_code)]
1628 fn with_error(node_id: &str, err: &(dyn std::error::Error + 'static)) -> Self {
1629 Self {
1630 ok: false,
1631 payload: Value::Null,
1632 meta: json!({
1633 "error": {
1634 "kind": "flow_node_failed",
1635 "message": err.to_string(),
1636 "node_id": node_id,
1637 }
1638 }),
1639 }
1640 }
1641}
1642
1643struct DispatchOutcome {
1644 output: NodeOutput,
1645 control: NodeControl,
1646}
1647
1648impl DispatchOutcome {
1649 fn complete(output: NodeOutput) -> Self {
1650 Self {
1651 output,
1652 control: NodeControl::Continue,
1653 }
1654 }
1655
1656 fn wait(output: NodeOutput, reason: Option<String>) -> Self {
1657 Self {
1658 output,
1659 control: NodeControl::Wait { reason },
1660 }
1661 }
1662
1663 fn with_control(output: NodeOutput, control: NodeControl) -> Self {
1664 Self { output, control }
1665 }
1666}
1667
1668#[derive(Clone, Debug)]
1669enum NodeControl {
1670 Continue,
1671 Wait {
1672 reason: Option<String>,
1673 },
1674 Jump(JumpControl),
1675 Respond {
1676 text: Option<String>,
1677 card_cbor: Option<Vec<u8>>,
1678 needs_user: Option<bool>,
1679 },
1680}
1681
1682#[derive(Clone, Debug)]
1683struct JumpControl {
1684 flow: String,
1685 node: Option<String>,
1686 payload: Value,
1687 hints: Value,
1688 max_redirects: Option<u32>,
1689 reason: Option<String>,
1690}
1691
1692#[derive(Clone, Debug)]
1693struct JumpTarget {
1694 flow_id: String,
1695 flow: HostFlow,
1696 node_id: NodeId,
1697}
1698
1699impl NodeOutput {
1700 fn with_meta(payload: Value, meta: Value) -> Self {
1701 Self {
1702 ok: true,
1703 payload,
1704 meta,
1705 }
1706 }
1707}
1708
1709fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
1710 ComponentExecCtx {
1711 tenant: ComponentTenantCtx {
1712 tenant: ctx.tenant.to_string(),
1713 team: None,
1714 user: ctx.provider_id.map(str::to_string),
1715 trace_id: None,
1716 i18n_id: None,
1717 correlation_id: ctx.session_id.map(str::to_string),
1718 deadline_unix_ms: None,
1719 attempt: ctx.attempt,
1720 idempotency_key: ctx.session_id.map(str::to_string),
1721 },
1722 i18n_id: None,
1723 flow_id: ctx.flow_id.to_string(),
1724 node_id: Some(node_id.to_string()),
1725 }
1726}
1727
1728fn component_error(value: &Value) -> Option<(String, String)> {
1729 let obj = value.as_object()?;
1730 let ok = obj.get("ok").and_then(Value::as_bool)?;
1731 if ok {
1732 return None;
1733 }
1734 let err = obj.get("error")?.as_object()?;
1735 let code = err
1736 .get("code")
1737 .and_then(Value::as_str)
1738 .unwrap_or("component_error");
1739 let message = err
1740 .get("message")
1741 .and_then(Value::as_str)
1742 .unwrap_or("component reported error");
1743 Some((code.to_string(), message.to_string()))
1744}
1745
1746fn mcp_tool_error(value: &Value) -> Option<(String, String)> {
1751 let obj = value.as_object()?;
1752 if obj.contains_key("result") {
1754 return None;
1755 }
1756 let err = obj.get("error")?.as_object()?;
1757 let code = err
1758 .get("code")
1759 .and_then(Value::as_str)
1760 .unwrap_or("tool_error");
1761 let raw_message = err
1762 .get("message")
1763 .and_then(Value::as_str)
1764 .unwrap_or("tool returned an error");
1765 let status = err.get("status").and_then(Value::as_u64);
1766 let message = match status {
1767 Some(s) => format!("{raw_message} (status {s})"),
1768 None => raw_message.to_string(),
1769 };
1770 Some((code.to_string(), message))
1771}
1772
1773fn extract_wait_reason(payload: &Value) -> Option<String> {
1774 match payload {
1775 Value::String(s) => Some(s.clone()),
1776 Value::Object(map) => map
1777 .get("reason")
1778 .and_then(Value::as_str)
1779 .map(|value| value.to_string()),
1780 _ => None,
1781 }
1782}
1783
1784fn component_dispatch_outcome(output: NodeOutput) -> Result<DispatchOutcome> {
1785 if let Some(control) = parse_component_control(&output.payload)? {
1786 return Ok(match control {
1787 NodeControl::Jump(jump) => {
1788 let adjusted = NodeOutput::with_meta(jump.payload.clone(), jump.hints.clone());
1789 DispatchOutcome::with_control(adjusted, NodeControl::Jump(jump))
1790 }
1791 NodeControl::Respond {
1792 text,
1793 card_cbor,
1794 needs_user,
1795 } => DispatchOutcome::with_control(
1796 output,
1797 NodeControl::Respond {
1798 text,
1799 card_cbor,
1800 needs_user,
1801 },
1802 ),
1803 other => DispatchOutcome::with_control(output, other),
1804 });
1805 }
1806 Ok(DispatchOutcome::complete(output))
1807}
1808
1809fn parse_component_control(payload: &Value) -> Result<Option<NodeControl>> {
1810 let Value::Object(map) = payload else {
1811 return Ok(None);
1812 };
1813 let Some(control_value) = map.get("greentic_control") else {
1814 return Ok(None);
1815 };
1816 let control = control_value
1817 .as_object()
1818 .ok_or_else(|| anyhow!("jump_failed: greentic_control must be an object"))?;
1819 let action = control
1820 .get("action")
1821 .and_then(Value::as_str)
1822 .ok_or_else(|| anyhow!("jump_failed: greentic_control.action is required"))?;
1823 let version = control
1824 .get("v")
1825 .and_then(Value::as_u64)
1826 .ok_or_else(|| anyhow!("jump_failed: greentic_control.v is required"))?;
1827 if version != 1 {
1828 bail!("jump_failed: unsupported greentic_control.v={version}");
1829 }
1830
1831 match action {
1832 "jump" => {
1833 let flow = control
1834 .get("flow")
1835 .and_then(Value::as_str)
1836 .map(str::trim)
1837 .filter(|value| !value.is_empty())
1838 .ok_or_else(|| anyhow!("jump_failed: jump flow is required"))?
1839 .to_string();
1840 let node = control
1841 .get("node")
1842 .and_then(Value::as_str)
1843 .map(str::trim)
1844 .filter(|value| !value.is_empty())
1845 .map(str::to_string);
1846 let payload = control.get("payload").cloned().unwrap_or(Value::Null);
1847 let hints = control.get("hints").cloned().unwrap_or(Value::Null);
1848 let max_redirects = control
1849 .get("max_redirects")
1850 .and_then(Value::as_u64)
1851 .and_then(|value| u32::try_from(value).ok());
1852 let reason = control
1853 .get("reason")
1854 .and_then(Value::as_str)
1855 .map(str::to_string);
1856 Ok(Some(NodeControl::Jump(JumpControl {
1857 flow,
1858 node,
1859 payload,
1860 hints,
1861 max_redirects,
1862 reason,
1863 })))
1864 }
1865 "respond" => {
1866 let text = control
1867 .get("text")
1868 .and_then(Value::as_str)
1869 .map(str::to_string);
1870 let card_cbor = control
1871 .get("card_cbor")
1872 .and_then(Value::as_array)
1873 .map(|bytes| {
1874 bytes
1875 .iter()
1876 .filter_map(Value::as_u64)
1877 .filter_map(|value| u8::try_from(value).ok())
1878 .collect::<Vec<_>>()
1879 });
1880 let needs_user = control.get("needs_user").and_then(Value::as_bool);
1881 Ok(Some(NodeControl::Respond {
1882 text,
1883 card_cbor,
1884 needs_user,
1885 }))
1886 }
1887 _ => Ok(None),
1888 }
1889}
1890
1891fn template_context(state: &ExecutionState, prev: Value) -> Value {
1892 let entry = if state.entry.is_null() {
1893 Value::Object(JsonMap::new())
1894 } else {
1895 state.entry.clone()
1896 };
1897 let mut ctx = JsonMap::new();
1898 ctx.insert("entry".into(), entry.clone());
1899 ctx.insert("in".into(), entry); ctx.insert("prev".into(), prev);
1901 ctx.insert("node".into(), Value::Object(state.outputs_map()));
1902 ctx.insert("state".into(), state.context());
1903 Value::Object(ctx)
1904}
1905
1906impl From<Flow> for HostFlow {
1907 fn from(value: Flow) -> Self {
1908 let mut nodes = IndexMap::new();
1909 for (id, node) in value.nodes {
1910 nodes.insert(id.clone(), HostNode::from(node));
1911 }
1912 let start = value
1913 .entrypoints
1914 .get("default")
1915 .and_then(Value::as_str)
1916 .and_then(|id| NodeId::from_str(id).ok())
1917 .or_else(|| nodes.keys().next().cloned());
1918 let slot_schema = value
1922 .metadata
1923 .extra
1924 .get(SLOT_SCHEMA_METADATA_KEY)
1925 .filter(|v| !v.is_null())
1926 .cloned();
1927 Self {
1928 id: value.id.as_str().to_string(),
1929 start,
1930 nodes,
1931 slot_schema,
1932 }
1933 }
1934}
1935
1936impl From<Node> for HostNode {
1937 fn from(node: Node) -> Self {
1938 let full_ref = node.component.id.as_str().to_string();
1939 let is_builtin = full_ref.starts_with("component.exec")
1942 || full_ref.starts_with("flow.")
1943 || full_ref.starts_with("emit.")
1944 || full_ref.starts_with("session.")
1945 || full_ref.starts_with("provider.");
1946 let (component_ref, raw_operation) = if node.component.operation.is_some() || is_builtin {
1947 (full_ref, node.component.operation.clone())
1948 } else if let Some(dot) = full_ref.rfind('.') {
1949 let comp = full_ref[..dot].to_string();
1950 let op = full_ref[dot + 1..].to_string();
1951 (comp, Some(op))
1952 } else {
1953 (full_ref, None)
1954 };
1955 let operation_in_mapping = extract_operation_from_mapping(&node.input.mapping);
1956 let operation_is_component_exec = raw_operation.as_deref() == Some("component.exec");
1957 let operation_is_emit = raw_operation
1958 .as_deref()
1959 .map(|op| op.starts_with("emit."))
1960 .unwrap_or(false);
1961 let is_component_exec = component_ref == "component.exec" || operation_is_component_exec;
1962
1963 let kind = if is_component_exec {
1964 let target = if component_ref == "component.exec" {
1965 if let Some(op) = raw_operation
1966 .as_deref()
1967 .filter(|op| op.starts_with("emit."))
1968 {
1969 op.to_string()
1970 } else {
1971 extract_target_component(&node.input.mapping)
1972 .unwrap_or_else(|| "component.exec".to_string())
1973 }
1974 } else {
1975 extract_target_component(&node.input.mapping)
1976 .unwrap_or_else(|| component_ref.clone())
1977 };
1978 if target.starts_with("emit.") {
1979 NodeKind::BuiltinEmit {
1980 kind: emit_kind_from_ref(&target),
1981 }
1982 } else {
1983 NodeKind::Exec {
1984 target_component: target,
1985 }
1986 }
1987 } else if operation_is_emit {
1988 NodeKind::BuiltinEmit {
1989 kind: emit_kind_from_ref(raw_operation.as_deref().unwrap_or("emit.log")),
1990 }
1991 } else {
1992 match component_ref.as_str() {
1993 "flow.call" => NodeKind::FlowCall,
1994 "provider.invoke" => NodeKind::ProviderInvoke,
1995 "session.wait" => NodeKind::Wait,
1996 "state.get" => NodeKind::BuiltinStateGet,
1997 "state.set" => NodeKind::BuiltinStateSet,
1998 comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
1999 kind: emit_kind_from_ref(comp),
2000 },
2001 other => NodeKind::PackComponent {
2002 component_ref: other.to_string(),
2003 },
2004 }
2005 };
2006 let component_label = match &kind {
2007 NodeKind::Exec { .. } => "component.exec".to_string(),
2008 NodeKind::PackComponent { component_ref } => component_ref.clone(),
2009 NodeKind::ProviderInvoke => "provider.invoke".to_string(),
2010 NodeKind::FlowCall => "flow.call".to_string(),
2011 NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
2012 NodeKind::BuiltinStateGet => "state.get".to_string(),
2013 NodeKind::BuiltinStateSet => "state.set".to_string(),
2014 NodeKind::Wait => "session.wait".to_string(),
2015 };
2016 let operation_name = if is_component_exec && operation_is_component_exec {
2017 None
2018 } else {
2019 raw_operation.clone()
2020 };
2021 let payload_expr = match kind {
2022 NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.input.mapping),
2023 _ => node.input.mapping.clone(),
2024 };
2025 Self {
2026 kind,
2027 component: component_label,
2028 component_id: if is_component_exec {
2029 "component.exec".to_string()
2030 } else {
2031 component_ref
2032 },
2033 operation_name,
2034 operation_in_mapping,
2035 payload_expr,
2036 routing: node.routing,
2037 }
2038 }
2039}
2040
2041fn extract_target_component(payload: &Value) -> Option<String> {
2042 match payload {
2043 Value::Object(map) => map
2044 .get("component")
2045 .or_else(|| map.get("component_ref"))
2046 .and_then(Value::as_str)
2047 .map(|s| s.to_string()),
2048 _ => None,
2049 }
2050}
2051
2052fn extract_operation_from_mapping(payload: &Value) -> Option<String> {
2053 match payload {
2054 Value::Object(map) => map
2055 .get("operation")
2056 .or_else(|| map.get("op"))
2057 .and_then(Value::as_str)
2058 .map(str::trim)
2059 .filter(|value| !value.is_empty())
2060 .map(|value| value.to_string()),
2061 _ => None,
2062 }
2063}
2064
2065fn extract_emit_payload(payload: &Value) -> Value {
2066 if let Value::Object(map) = payload {
2067 if let Some(input) = map.get("input") {
2068 return input.clone();
2069 }
2070 if let Some(inner) = map.get("payload") {
2071 return inner.clone();
2072 }
2073 }
2074 payload.clone()
2075}
2076
2077fn split_operation_payload(payload: Value) -> (Value, Value) {
2078 if let Value::Object(mut map) = payload.clone()
2079 && map.contains_key("input")
2080 {
2081 let input = map.remove("input").unwrap_or(Value::Null);
2082 let config = map.remove("config").unwrap_or(Value::Null);
2083 let legacy_only = map.keys().all(|key| {
2084 matches!(
2085 key.as_str(),
2086 "operation" | "op" | "component" | "component_ref"
2087 )
2088 });
2089 if legacy_only {
2090 return (input, config);
2091 }
2092 }
2093 (payload, Value::Null)
2094}
2095
2096fn resolve_component_operation(
2097 node_id: &str,
2098 component_label: &str,
2099 payload_operation: Option<String>,
2100 operation_override: Option<&str>,
2101 operation_in_mapping: Option<&str>,
2102) -> Result<String> {
2103 if let Some(op) = operation_override
2104 .map(str::trim)
2105 .filter(|value| !value.is_empty())
2106 {
2107 return Ok(op.to_string());
2108 }
2109
2110 if let Some(op) = payload_operation
2111 .as_deref()
2112 .map(str::trim)
2113 .filter(|value| !value.is_empty())
2114 {
2115 return Ok(op.to_string());
2116 }
2117
2118 let mut message = format!(
2119 "missing operation for node `{}` (component `{}`); expected node.component.operation to be set",
2120 node_id, component_label,
2121 );
2122 if let Some(found) = operation_in_mapping {
2123 message.push_str(&format!(
2124 ". Found operation in input.mapping (`{}`) but this is not used; pack compiler must preserve node.component.operation.",
2125 found
2126 ));
2127 }
2128 bail!(message);
2129}
2130
2131fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
2132 match component_ref {
2133 "emit.log" => EmitKind::Log,
2134 "emit.response" => EmitKind::Response,
2135 other => EmitKind::Other(other.to_string()),
2136 }
2137}
2138
2139fn emit_ref_from_kind(kind: &EmitKind) -> String {
2140 match kind {
2141 EmitKind::Log => "emit.log".to_string(),
2142 EmitKind::Response => "emit.response".to_string(),
2143 EmitKind::Other(other) => other.clone(),
2144 }
2145}
2146
2147fn is_card_invocation(input: &Value) -> bool {
2150 if let Value::Object(map) = input {
2151 return map.contains_key("card_source") || map.contains_key("card_spec");
2152 }
2153 false
2154}
2155
2156fn promote_card_config_to_invocation(input: &mut Value, config: &Value) {
2168 if is_card_invocation(input) {
2169 return;
2170 }
2171
2172 let cfg_map = card_defaults_source(input, config);
2173 let Some(cfg) = cfg_map else { return };
2174
2175 let default_asset = cfg
2176 .get("default_card_asset")
2177 .and_then(Value::as_str)
2178 .map(str::trim)
2179 .filter(|value| !value.is_empty())
2180 .map(str::to_string);
2181 let default_inline = cfg
2182 .get("default_card_inline")
2183 .filter(|value| value.is_object() || value.is_array())
2184 .cloned();
2185 let default_source = cfg
2186 .get("default_source")
2187 .and_then(Value::as_str)
2188 .map(str::trim)
2189 .filter(|value| !value.is_empty())
2190 .map(str::to_lowercase);
2191
2192 if default_asset.is_none() && default_inline.is_none() && default_source.is_none() {
2193 return;
2194 }
2195
2196 let card_source = default_source.unwrap_or_else(|| {
2197 if default_inline.is_some() {
2198 "inline".to_string()
2199 } else {
2200 "asset".to_string()
2201 }
2202 });
2203
2204 let mut card_spec = serde_json::Map::new();
2205 match card_source.as_str() {
2206 "asset" => {
2207 if let Some(path) = default_asset {
2208 card_spec.insert("asset_path".into(), Value::String(path));
2209 }
2210 }
2211 "inline" => {
2212 if let Some(inline) = default_inline {
2213 card_spec.insert("inline_json".into(), inline);
2214 }
2215 }
2216 _ => {}
2217 }
2218
2219 if !matches!(input, Value::Object(_)) {
2220 *input = Value::Object(serde_json::Map::new());
2221 }
2222 if let Value::Object(map) = input {
2223 map.insert("card_source".into(), Value::String(card_source));
2224 map.insert("card_spec".into(), Value::Object(card_spec));
2225 }
2226}
2227
2228fn card_defaults_source<'a>(
2233 input: &'a Value,
2234 config: &'a Value,
2235) -> Option<&'a serde_json::Map<String, Value>> {
2236 if let Value::Object(map) = config {
2237 return Some(map);
2238 }
2239 if let Value::Object(map) = input
2240 && let Some(Value::Object(nested)) = map.get("config")
2241 {
2242 return Some(nested);
2243 }
2244 None
2245}
2246
2247fn inject_card_locale(payload: &mut Value, entry: &Value) {
2248 if !is_card_invocation(payload) {
2249 return;
2250 }
2251 let Value::Object(map) = payload else { return };
2252 if map.contains_key("locale") {
2253 return;
2254 }
2255 let locale = entry
2256 .pointer("/input/metadata/locale")
2257 .or_else(|| entry.pointer("/metadata/locale"))
2258 .and_then(Value::as_str);
2259 if let Some(locale) = locale {
2260 map.insert("locale".into(), Value::String(locale.to_string()));
2261 }
2262}
2263
2264fn inject_slot_definitions(input: &mut Value, slot_schema: &Value, flow_id: &str, node_id: &str) {
2270 if input.is_null() {
2271 *input = Value::Object(serde_json::Map::new());
2272 }
2273 let Some(map) = input.as_object_mut() else {
2274 tracing::warn!(
2275 flow_id,
2276 node_id,
2277 "slot-extractor input is not an object; cannot inject slot_definitions"
2278 );
2279 return;
2280 };
2281 if map.contains_key("slot_definitions") {
2282 return;
2283 }
2284 let slot_count = slot_schema.as_array().map_or(0, Vec::len);
2285 tracing::debug!(
2286 flow_id,
2287 slot_count,
2288 "injecting flow-level slot_schema as slot_definitions into slot-extractor input"
2289 );
2290 map.insert("slot_definitions".to_string(), slot_schema.clone());
2291}
2292
2293fn resolve_card_assets(input: &mut Value, pack: &crate::pack::PackRuntime) {
2300 resolve_card_spec_asset(input, pack);
2301
2302 if let Value::Object(map) = input
2305 && let Some(Value::Object(call)) = map.get_mut("call")
2306 && let Some(payload) = call.get_mut("payload")
2307 {
2308 resolve_card_spec_asset(payload, pack);
2309 }
2310}
2311
2312fn resolve_card_spec_asset(value: &mut Value, pack: &crate::pack::PackRuntime) {
2314 let Value::Object(map) = value else { return };
2315
2316 let is_asset = map
2317 .get("card_source")
2318 .and_then(Value::as_str)
2319 .map(|s| s.eq_ignore_ascii_case("asset"))
2320 .unwrap_or(false);
2321 if !is_asset {
2322 return;
2323 }
2324
2325 let asset_path = map
2326 .get("card_spec")
2327 .and_then(|spec| spec.get("asset_path"))
2328 .and_then(Value::as_str)
2329 .map(str::to_string);
2330
2331 let Some(asset_path) = asset_path else { return };
2332
2333 match pack.read_asset(&asset_path) {
2334 Ok(bytes) => {
2335 let card_json: Value = match serde_json::from_slice(&bytes) {
2336 Ok(v) => v,
2337 Err(err) => {
2338 tracing::warn!(
2339 asset_path,
2340 %err,
2341 "failed to parse card asset as JSON; leaving as asset reference"
2342 );
2343 return;
2344 }
2345 };
2346 tracing::debug!(asset_path, "pre-resolved card asset to inline_json");
2347 map.insert("card_source".into(), Value::String("inline".into()));
2348 if let Some(Value::Object(spec)) = map.get_mut("card_spec") {
2349 spec.insert("inline_json".into(), card_json);
2350 spec.remove("asset_path");
2351 }
2352 }
2353 Err(err) => {
2354 tracing::warn!(
2355 asset_path,
2356 %err,
2357 "card asset not found in pack; leaving as asset reference"
2358 );
2359 }
2360 }
2361
2362 let configured_bundle_path = map
2369 .get("card_spec")
2370 .and_then(|spec| spec.get("i18n_bundle_path"))
2371 .and_then(Value::as_str)
2372 .map(|s| s.trim().trim_end_matches('/').to_string())
2373 .filter(|s| !s.is_empty());
2374
2375 let bundle_path = configured_bundle_path
2376 .clone()
2377 .unwrap_or_else(|| "assets/i18n".to_string());
2378
2379 let i18n_entries = load_i18n_bundle_entries(&bundle_path, |path| pack.read_asset(path));
2380
2381 if !i18n_entries.is_empty() {
2382 let locale_keys: Vec<_> = i18n_entries.keys().cloned().collect();
2383 if let Some(Value::Object(spec)) = map.get_mut("card_spec") {
2384 spec.insert("i18n_inline".into(), Value::Object(i18n_entries));
2385 if configured_bundle_path.is_some() {
2386 tracing::info!(%bundle_path, ?locale_keys, "pre-resolved i18n bundle into card_spec.i18n_inline");
2387 } else {
2388 tracing::info!(%bundle_path, ?locale_keys, "auto-discovered i18n bundle and inlined into card_spec.i18n_inline");
2389 }
2390 }
2391 }
2392}
2393
2394fn load_i18n_bundle_entries<F>(bundle_path: &str, mut read_asset: F) -> JsonMap<String, Value>
2395where
2396 F: FnMut(&str) -> Result<Vec<u8>>,
2397{
2398 let mut i18n_entries = JsonMap::new();
2399
2400 if bundle_path.ends_with(".json") {
2401 if let Ok(bytes) = read_asset(bundle_path)
2402 && let Ok(Value::Object(entries)) = serde_json::from_slice::<Value>(&bytes)
2403 {
2404 i18n_entries.insert("en".to_string(), Value::Object(entries));
2405 }
2406 return i18n_entries;
2407 }
2408
2409 let manifest_path = format!("{bundle_path}/_manifest.json");
2410 let locale_codes: Vec<String> = read_asset(&manifest_path)
2411 .ok()
2412 .and_then(|bytes| serde_json::from_slice::<Value>(&bytes).ok())
2413 .and_then(|value| {
2414 let locales = value
2415 .get("locales")
2416 .and_then(Value::as_array)
2417 .cloned()
2418 .or_else(|| value.as_array().cloned());
2419 locales.map(|items| {
2420 items
2421 .iter()
2422 .filter_map(Value::as_str)
2423 .map(String::from)
2424 .collect()
2425 })
2426 })
2427 .unwrap_or_default();
2428
2429 tracing::info!(%bundle_path, ?locale_codes, "i18n manifest discovered locales");
2430
2431 for locale in &locale_codes {
2432 let candidate = format!("{bundle_path}/{locale}.json");
2433 if let Ok(bytes) = read_asset(&candidate)
2434 && let Ok(Value::Object(entries)) = serde_json::from_slice::<Value>(&bytes)
2435 {
2436 i18n_entries.insert(locale.clone(), Value::Object(entries));
2437 }
2438 }
2439 if !i18n_entries.contains_key("en") {
2440 let en_path = format!("{bundle_path}/en.json");
2441 if let Ok(bytes) = read_asset(&en_path)
2442 && let Ok(Value::Object(entries)) = serde_json::from_slice::<Value>(&bytes)
2443 {
2444 i18n_entries.insert("en".to_string(), Value::Object(entries));
2445 }
2446 }
2447
2448 i18n_entries
2449}
2450
2451#[derive(Debug)]
2460pub(crate) enum CustomRoutingDecision {
2461 Next(NodeId),
2462 End,
2463 Wait,
2464}
2465
2466fn evaluate_custom_routing(
2479 raw: &Value,
2480 output: &NodeOutput,
2481 state: &ExecutionState,
2482 flow_ir: &HostFlow,
2483 node_id: &NodeId,
2484) -> CustomRoutingDecision {
2485 let routes = match raw.as_array() {
2486 Some(arr) => arr,
2487 None => {
2488 tracing::warn!(
2489 flow_id = %flow_ir.id,
2490 node_id = %node_id,
2491 "custom routing is not an array; terminating"
2492 );
2493 return CustomRoutingDecision::End;
2494 }
2495 };
2496
2497 let ctx = build_routing_context(output, state);
2500
2501 let mut has_condition = false;
2502 for route in routes {
2503 let condition = route.get("condition").and_then(|v| v.as_str());
2504 let to = route.get("to").and_then(|v| v.as_str());
2505
2506 if let Some(cond) = condition {
2507 has_condition = true;
2508 if evaluate_simple_condition(cond, &ctx)
2509 && let Some(target) = to
2510 && let Ok(nid) = NodeId::new(target)
2511 {
2512 tracing::debug!(
2513 flow_id = %flow_ir.id,
2514 node_id = %node_id,
2515 condition = cond,
2516 target = target,
2517 "conditional route matched"
2518 );
2519 return CustomRoutingDecision::Next(nid);
2520 }
2521 } else if let Some(target) = to
2522 && let Ok(nid) = NodeId::new(target)
2523 {
2524 tracing::debug!(
2525 flow_id = %flow_ir.id,
2526 node_id = %node_id,
2527 target = target,
2528 "default route taken"
2529 );
2530 return CustomRoutingDecision::Next(nid);
2531 }
2532 }
2533
2534 if has_condition {
2541 tracing::debug!(
2542 flow_id = %flow_ir.id,
2543 node_id = %node_id,
2544 "no conditional route matched; pausing run at current node for resume"
2545 );
2546 CustomRoutingDecision::Wait
2547 } else {
2548 tracing::warn!(
2549 flow_id = %flow_ir.id,
2550 node_id = %node_id,
2551 "no route matched and no conditions present; terminating"
2552 );
2553 CustomRoutingDecision::End
2554 }
2555}
2556
2557fn evaluate_simple_condition(condition: &str, ctx: &Value) -> bool {
2562 let (path, expected, negate) = if let Some(idx) = condition.find("==") {
2564 let path = condition[..idx].trim();
2565 let val = condition[idx + 2..].trim().trim_matches('"');
2566 (path, val, false)
2567 } else if let Some(idx) = condition.find("!=") {
2568 let path = condition[..idx].trim();
2569 let val = condition[idx + 2..].trim().trim_matches('"');
2570 (path, val, true)
2571 } else {
2572 return false;
2573 };
2574
2575 let actual = resolve_dotted_path(ctx, path);
2577 let matches = actual
2578 .as_deref()
2579 .is_some_and(|a| a.eq_ignore_ascii_case(expected));
2580 if negate { !matches } else { matches }
2581}
2582
2583fn resolve_dotted_path(value: &Value, path: &str) -> Option<String> {
2585 let parts: Vec<&str> = path.split('.').collect();
2586 let mut current = value;
2587 for part in &parts {
2588 current = current.get(part)?;
2589 }
2590 match current {
2591 Value::String(s) => Some(s.clone()),
2592 Value::Bool(b) => Some(b.to_string()),
2593 Value::Number(n) => Some(n.to_string()),
2594 _ => Some(current.to_string()),
2595 }
2596}
2597
2598fn build_routing_context(output: &NodeOutput, state: &ExecutionState) -> Value {
2616 let mut ctx = match &output.payload {
2617 Value::Object(map) => map.clone(),
2618 _ => JsonMap::new(),
2619 };
2620
2621 let entry = &state.entry;
2622 ctx.insert("entry".into(), entry.clone());
2623 ctx.insert("in".into(), entry.clone());
2624
2625 let metadata = entry
2629 .pointer("/input/metadata")
2630 .or_else(|| entry.pointer("/metadata"));
2631
2632 let mut response = JsonMap::new();
2633 if let Some(Value::Object(meta)) = metadata {
2634 for (k, v) in meta {
2635 match v {
2637 Value::String(s) => {
2638 response.insert(k.clone(), Value::String(s.clone()));
2639 }
2640 other => {
2641 response.insert(k.clone(), other.clone());
2642 }
2643 }
2644 }
2645 }
2646 if let Some(text) = entry
2648 .pointer("/input/text")
2649 .or_else(|| entry.pointer("/text"))
2650 .filter(|t| !t.is_null())
2651 {
2652 response.insert("text".into(), text.clone());
2653 }
2654 ctx.insert("response".into(), Value::Object(response));
2655
2656 Value::Object(ctx)
2657}
2658
2659#[cfg(test)]
2660mod tests {
2661 use super::*;
2662 use crate::validate::{ValidationConfig, ValidationMode};
2663 use greentic_types::{
2664 Flow, FlowComponentRef, FlowId, FlowKind, InputMapping, Node, NodeId, OutputMapping,
2665 Routing, TelemetryHints,
2666 };
2667 use serde_json::json;
2668 use std::collections::{BTreeMap, HashMap as StdHashMap};
2669 use std::str::FromStr;
2670 use std::sync::Mutex;
2671 use tokio::runtime::Runtime;
2672
2673 fn minimal_engine() -> FlowEngine {
2674 FlowEngine {
2675 packs: Vec::new(),
2676 flows: Vec::new(),
2677 flow_sources: HashMap::new(),
2678 flow_cache: RwLock::new(HashMap::new()),
2679 default_env: "local".to_string(),
2680 validation: ValidationConfig {
2681 mode: ValidationMode::Off,
2682 },
2683 cross_pack_resolver: None,
2684 rollout_ids: RolloutIds::default(),
2685 }
2686 }
2687
2688 #[test]
2689 fn templating_renders_with_partials_and_data() {
2690 let mut state = ExecutionState::new(json!({ "city": "London" }));
2691 state.nodes.insert(
2692 "forecast".to_string(),
2693 NodeOutput::new(json!({ "temp": "20C" })),
2694 );
2695
2696 let ctx = state.context();
2698 assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
2699 }
2700
2701 #[test]
2702 fn finalize_wraps_emitted_payloads() {
2703 let mut state = ExecutionState::new(json!({}));
2704 state.push_egress(json!({ "text": "first" }));
2705 state.push_egress(json!({ "text": "second" }));
2706 let result = state.finalize_with(Some(json!({ "text": "final" })));
2707 assert_eq!(
2708 result,
2709 json!([
2710 { "text": "first" },
2711 { "text": "second" },
2712 { "text": "final" }
2713 ])
2714 );
2715 }
2716
2717 #[test]
2718 fn finalize_flattens_final_array() {
2719 let mut state = ExecutionState::new(json!({}));
2720 state.push_egress(json!({ "text": "only" }));
2721 let result = state.finalize_with(Some(json!([
2722 { "text": "extra-1" },
2723 { "text": "extra-2" }
2724 ])));
2725 assert_eq!(
2726 result,
2727 json!([
2728 { "text": "only" },
2729 { "text": "extra-1" },
2730 { "text": "extra-2" }
2731 ])
2732 );
2733 }
2734
2735 #[test]
2736 fn inject_card_locale_uses_entry_metadata_without_overwriting_payload() {
2737 let mut payload = json!({
2738 "card_source": "inline",
2739 "card_spec": { "title": "Hello" }
2740 });
2741 inject_card_locale(
2742 &mut payload,
2743 &json!({"input": {"metadata": {"locale": "nl-NL"}}}),
2744 );
2745 assert_eq!(payload["locale"], json!("nl-NL"));
2746
2747 let mut existing = json!({
2748 "card_source": "inline",
2749 "card_spec": { "title": "Hello" },
2750 "locale": "en-GB"
2751 });
2752 inject_card_locale(&mut existing, &json!({"metadata": {"locale": "nl-NL"}}));
2753 assert_eq!(existing["locale"], json!("en-GB"));
2754 }
2755
2756 #[test]
2757 fn load_i18n_bundle_entries_reads_manifest_and_falls_back_to_en() {
2758 let assets = StdHashMap::from([
2759 (
2760 "cards/i18n/_manifest.json".to_string(),
2761 br#"{"locales":["de"]}"#.to_vec(),
2762 ),
2763 (
2764 "cards/i18n/de.json".to_string(),
2765 br#"{"title":"Hallo"}"#.to_vec(),
2766 ),
2767 (
2768 "cards/i18n/en.json".to_string(),
2769 br#"{"title":"Hello"}"#.to_vec(),
2770 ),
2771 ]);
2772
2773 let entries = load_i18n_bundle_entries("cards/i18n", |path| {
2774 assets
2775 .get(path)
2776 .cloned()
2777 .with_context(|| format!("missing asset {path}"))
2778 });
2779
2780 assert_eq!(entries["de"]["title"], json!("Hallo"));
2781 assert_eq!(entries["en"]["title"], json!("Hello"));
2782 }
2783
2784 #[test]
2785 fn load_i18n_bundle_entries_reads_single_file_bundle() {
2786 let entries = load_i18n_bundle_entries("cards/i18n.json", |path| {
2787 if path == "cards/i18n.json" {
2788 Ok(br#"{"title":"Hello"}"#.to_vec())
2789 } else {
2790 bail!("unexpected asset {path}");
2791 }
2792 });
2793
2794 assert_eq!(entries["en"]["title"], json!("Hello"));
2795 }
2796
2797 struct TestCrossPackResolver;
2798
2799 impl CrossPackResolver for TestCrossPackResolver {
2800 fn invoke(
2801 &self,
2802 provider_id: &str,
2803 provider_type: Option<&str>,
2804 op: &str,
2805 input: &[u8],
2806 tenant: &str,
2807 team: Option<&str>,
2808 ) -> Result<Value> {
2809 Ok(json!({
2810 "provider_id": provider_id,
2811 "provider_type": provider_type,
2812 "op": op,
2813 "tenant": tenant,
2814 "team": team,
2815 "input": serde_json::from_slice::<Value>(input)?,
2816 }))
2817 }
2818 }
2819
2820 #[test]
2821 fn cross_pack_resolver_returns_node_output_when_present() {
2822 let mut engine = minimal_engine();
2823 engine.set_cross_pack_resolver(Arc::new(TestCrossPackResolver));
2824
2825 let output = engine
2826 .try_invoke_cross_pack_resolver(
2827 Some("mail"),
2828 Some("messaging"),
2829 "send",
2830 br#"{"subject":"hello"}"#,
2831 "demo",
2832 )
2833 .expect("resolver invocation")
2834 .expect("resolver output");
2835
2836 assert_eq!(
2837 output.payload,
2838 json!({
2839 "provider_id": "mail",
2840 "provider_type": "messaging",
2841 "op": "send",
2842 "tenant": "demo",
2843 "team": null,
2844 "input": { "subject": "hello" },
2845 })
2846 );
2847 }
2848
2849 #[test]
2850 fn parse_component_control_ignores_plain_payload() {
2851 let payload = json!({
2852 "flow": "not-a-control-field",
2853 "node": "n1"
2854 });
2855 let control = parse_component_control(&payload).expect("parse control");
2856 assert!(control.is_none());
2857 }
2858
2859 #[test]
2860 fn parse_component_control_parses_jump_marker() {
2861 let payload = json!({
2862 "greentic_control": {
2863 "action": "jump",
2864 "v": 1,
2865 "flow": "flow.b",
2866 "node": "node-2",
2867 "payload": { "message": "hi" },
2868 "hints": { "k": "v" },
2869 "max_redirects": 2,
2870 "reason": "handoff"
2871 }
2872 });
2873 let control = parse_component_control(&payload)
2874 .expect("parse control")
2875 .expect("missing control");
2876 match control {
2877 NodeControl::Jump(jump) => {
2878 assert_eq!(jump.flow, "flow.b");
2879 assert_eq!(jump.node.as_deref(), Some("node-2"));
2880 assert_eq!(jump.payload, json!({ "message": "hi" }));
2881 assert_eq!(jump.hints, json!({ "k": "v" }));
2882 assert_eq!(jump.max_redirects, Some(2));
2883 assert_eq!(jump.reason.as_deref(), Some("handoff"));
2884 }
2885 other => panic!("expected jump control, got {other:?}"),
2886 }
2887 }
2888
2889 #[test]
2890 fn parse_component_control_rejects_invalid_marker() {
2891 let payload = json!({
2892 "greentic_control": "bad-shape"
2893 });
2894 let err = parse_component_control(&payload).expect_err("expected invalid marker error");
2895 assert!(err.to_string().contains("greentic_control"));
2896 }
2897
2898 #[test]
2899 fn missing_operation_reports_node_and_component() {
2900 let engine = minimal_engine();
2901 let rt = Runtime::new().unwrap();
2902 let retry_config = RetryConfig {
2903 max_attempts: 1,
2904 base_delay_ms: 1,
2905 };
2906 let ctx = FlowContext {
2907 tenant: "tenant",
2908 pack_id: "test-pack",
2909 flow_id: "flow",
2910 node_id: Some("missing-op"),
2911 tool: None,
2912 action: None,
2913 session_id: None,
2914 provider_id: None,
2915 retry_config,
2916 attempt: 1,
2917 observer: None,
2918 mocks: None,
2919 };
2920 let node = HostNode {
2921 kind: NodeKind::Exec {
2922 target_component: "qa.process".into(),
2923 },
2924 component: "component.exec".into(),
2925 component_id: "component.exec".into(),
2926 operation_name: None,
2927 operation_in_mapping: None,
2928 payload_expr: Value::Null,
2929 routing: Routing::End,
2930 };
2931 let _state = ExecutionState::new(Value::Null);
2932 let payload = json!({ "component": "qa.process" });
2933 let event = NodeEvent {
2934 context: &ctx,
2935 node_id: "missing-op",
2936 node: &node,
2937 payload: &payload,
2938 };
2939 let err = rt
2940 .block_on(engine.execute_component_exec(
2941 &ctx,
2942 "missing-op",
2943 &node,
2944 payload.clone(),
2945 &event,
2946 ComponentOverrides {
2947 component: None,
2948 operation: None,
2949 },
2950 ))
2951 .unwrap_err();
2952 let message = err.to_string();
2953 assert!(
2954 message.contains("missing operation for node `missing-op`"),
2955 "unexpected message: {message}"
2956 );
2957 assert!(
2958 message.contains("(component `component.exec`)"),
2959 "unexpected message: {message}"
2960 );
2961 }
2962
2963 #[test]
2964 fn missing_operation_mentions_mapping_hint() {
2965 let engine = minimal_engine();
2966 let rt = Runtime::new().unwrap();
2967 let retry_config = RetryConfig {
2968 max_attempts: 1,
2969 base_delay_ms: 1,
2970 };
2971 let ctx = FlowContext {
2972 tenant: "tenant",
2973 pack_id: "test-pack",
2974 flow_id: "flow",
2975 node_id: Some("missing-op-hint"),
2976 tool: None,
2977 action: None,
2978 session_id: None,
2979 provider_id: None,
2980 retry_config,
2981 attempt: 1,
2982 observer: None,
2983 mocks: None,
2984 };
2985 let node = HostNode {
2986 kind: NodeKind::Exec {
2987 target_component: "qa.process".into(),
2988 },
2989 component: "component.exec".into(),
2990 component_id: "component.exec".into(),
2991 operation_name: None,
2992 operation_in_mapping: Some("render".into()),
2993 payload_expr: Value::Null,
2994 routing: Routing::End,
2995 };
2996 let _state = ExecutionState::new(Value::Null);
2997 let payload = json!({ "component": "qa.process" });
2998 let event = NodeEvent {
2999 context: &ctx,
3000 node_id: "missing-op-hint",
3001 node: &node,
3002 payload: &payload,
3003 };
3004 let err = rt
3005 .block_on(engine.execute_component_exec(
3006 &ctx,
3007 "missing-op-hint",
3008 &node,
3009 payload.clone(),
3010 &event,
3011 ComponentOverrides {
3012 component: None,
3013 operation: None,
3014 },
3015 ))
3016 .unwrap_err();
3017 let message = err.to_string();
3018 assert!(
3019 message.contains("missing operation for node `missing-op-hint`"),
3020 "unexpected message: {message}"
3021 );
3022 assert!(
3023 message.contains("Found operation in input.mapping (`render`)"),
3024 "unexpected message: {message}"
3025 );
3026 }
3027
3028 struct CountingObserver {
3029 starts: Mutex<Vec<String>>,
3030 ends: Mutex<Vec<Value>>,
3031 }
3032
3033 impl CountingObserver {
3034 fn new() -> Self {
3035 Self {
3036 starts: Mutex::new(Vec::new()),
3037 ends: Mutex::new(Vec::new()),
3038 }
3039 }
3040 }
3041
3042 impl ExecutionObserver for CountingObserver {
3043 fn on_node_start(&self, event: &NodeEvent<'_>) {
3044 self.starts.lock().unwrap().push(event.node_id.to_string());
3045 }
3046
3047 fn on_node_end(&self, _event: &NodeEvent<'_>, output: &Value) {
3048 self.ends.lock().unwrap().push(output.clone());
3049 }
3050
3051 fn on_node_error(&self, _event: &NodeEvent<'_>, _error: &dyn StdError) {}
3052 }
3053
3054 #[test]
3055 fn emits_end_event_for_successful_node() {
3056 let node_id = NodeId::from_str("emit").unwrap();
3057 let node = Node {
3058 id: node_id.clone(),
3059 component: FlowComponentRef {
3060 id: "emit.log".parse().unwrap(),
3061 pack_alias: None,
3062 operation: None,
3063 },
3064 input: InputMapping {
3065 mapping: json!({ "message": "logged" }),
3066 },
3067 output: OutputMapping {
3068 mapping: Value::Null,
3069 },
3070 err_map: None,
3071 routing: Routing::End,
3072 telemetry: TelemetryHints::default(),
3073 };
3074 let mut nodes = indexmap::IndexMap::default();
3075 nodes.insert(node_id.clone(), node);
3076 let flow = Flow {
3077 schema_version: "1.0".into(),
3078 id: FlowId::from_str("emit.flow").unwrap(),
3079 kind: FlowKind::Messaging,
3080 entrypoints: BTreeMap::from([(
3081 "default".to_string(),
3082 Value::String(node_id.to_string()),
3083 )]),
3084 nodes,
3085 metadata: Default::default(),
3086 };
3087 let host_flow = HostFlow::from(flow);
3088
3089 let engine = FlowEngine {
3090 packs: Vec::new(),
3091 flows: Vec::new(),
3092 flow_sources: HashMap::new(),
3093 flow_cache: RwLock::new(HashMap::from([(
3094 FlowKey {
3095 pack_id: "test-pack".to_string(),
3096 flow_id: "emit.flow".to_string(),
3097 },
3098 host_flow,
3099 )])),
3100 default_env: "local".to_string(),
3101 validation: ValidationConfig {
3102 mode: ValidationMode::Off,
3103 },
3104 cross_pack_resolver: None,
3105 rollout_ids: RolloutIds::default(),
3106 };
3107 let observer = CountingObserver::new();
3108 let ctx = FlowContext {
3109 tenant: "demo",
3110 pack_id: "test-pack",
3111 flow_id: "emit.flow",
3112 node_id: None,
3113 tool: None,
3114 action: None,
3115 session_id: None,
3116 provider_id: None,
3117 retry_config: RetryConfig {
3118 max_attempts: 1,
3119 base_delay_ms: 1,
3120 },
3121 attempt: 1,
3122 observer: Some(&observer),
3123 mocks: None,
3124 };
3125
3126 let rt = Runtime::new().unwrap();
3127 let result = rt.block_on(engine.execute(ctx, Value::Null)).unwrap();
3128 assert!(matches!(result.status, FlowStatus::Completed));
3129
3130 let starts = observer.starts.lock().unwrap();
3131 let ends = observer.ends.lock().unwrap();
3132 assert_eq!(starts.len(), 1);
3133 assert_eq!(ends.len(), 1);
3134 assert_eq!(ends[0], json!({ "message": "logged" }));
3135 }
3136
3137 fn host_flow_for_test(
3138 flow_id: &str,
3139 node_ids: &[&str],
3140 default_start: Option<&str>,
3141 ) -> HostFlow {
3142 let mut nodes = indexmap::IndexMap::default();
3143 for node_id in node_ids {
3144 let id = NodeId::from_str(node_id).unwrap();
3145 let node = Node {
3146 id: id.clone(),
3147 component: FlowComponentRef {
3148 id: "emit.log".parse().unwrap(),
3149 pack_alias: None,
3150 operation: None,
3151 },
3152 input: InputMapping {
3153 mapping: json!({ "message": node_id }),
3154 },
3155 output: OutputMapping {
3156 mapping: Value::Null,
3157 },
3158 err_map: None,
3159 routing: Routing::End,
3160 telemetry: TelemetryHints::default(),
3161 };
3162 nodes.insert(id, node);
3163 }
3164 let mut entrypoints = BTreeMap::new();
3165 if let Some(start) = default_start {
3166 entrypoints.insert("default".to_string(), Value::String(start.to_string()));
3167 }
3168 HostFlow::from(Flow {
3169 schema_version: "1.0".into(),
3170 id: FlowId::from_str(flow_id).unwrap(),
3171 kind: FlowKind::Messaging,
3172 entrypoints,
3173 nodes,
3174 metadata: Default::default(),
3175 })
3176 }
3177
3178 fn jump_test_engine() -> FlowEngine {
3179 let target_flow = host_flow_for_test("flow.target", &["node-a", "node-b"], None);
3180 FlowEngine {
3181 packs: Vec::new(),
3182 flows: Vec::new(),
3183 flow_sources: HashMap::new(),
3184 flow_cache: RwLock::new(HashMap::from([(
3185 FlowKey {
3186 pack_id: "test-pack".to_string(),
3187 flow_id: "flow.target".to_string(),
3188 },
3189 target_flow,
3190 )])),
3191 default_env: "local".to_string(),
3192 validation: ValidationConfig {
3193 mode: ValidationMode::Off,
3194 },
3195 cross_pack_resolver: None,
3196 rollout_ids: RolloutIds::default(),
3197 }
3198 }
3199
3200 fn jump_ctx<'a>(flow_id: &'a str) -> FlowContext<'a> {
3201 FlowContext {
3202 tenant: "demo",
3203 pack_id: "test-pack",
3204 flow_id,
3205 node_id: None,
3206 tool: None,
3207 action: None,
3208 session_id: None,
3209 provider_id: None,
3210 retry_config: RetryConfig {
3211 max_attempts: 1,
3212 base_delay_ms: 1,
3213 },
3214 attempt: 1,
3215 observer: None,
3216 mocks: None,
3217 }
3218 }
3219
3220 #[test]
3221 fn with_rollout_ids_binds_revision_identity() {
3222 let engine = minimal_engine().with_rollout_ids(RolloutIds {
3223 customer_id: Some("cust-acme".into()),
3224 deployment_id: Some("01JTKS".into()),
3225 bundle_id: Some("customer.support".into()),
3226 revision_id: Some("01JTKR".into()),
3227 });
3228 assert_eq!(engine.rollout_ids.revision_id.as_deref(), Some("01JTKR"));
3229 assert_eq!(engine.rollout_ids.deployment_id.as_deref(), Some("01JTKS"));
3230 assert!(minimal_engine().rollout_ids.is_empty());
3232 }
3233
3234 #[test]
3235 fn apply_jump_unknown_flow_errors() {
3236 let engine = minimal_engine();
3237 let mut state = ExecutionState::new(Value::Null);
3238 let rt = Runtime::new().unwrap();
3239 let err = rt
3240 .block_on(engine.apply_jump(
3241 &jump_ctx("flow.source"),
3242 &mut state,
3243 JumpControl {
3244 flow: "flow.missing".into(),
3245 node: None,
3246 payload: json!({ "ok": true }),
3247 hints: Value::Null,
3248 max_redirects: None,
3249 reason: None,
3250 },
3251 ))
3252 .unwrap_err();
3253 assert!(
3254 err.to_string().contains("unknown_flow"),
3255 "unexpected error: {err}"
3256 );
3257 }
3258
3259 #[test]
3260 fn apply_jump_unknown_node_errors() {
3261 let engine = jump_test_engine();
3262 let mut state = ExecutionState::new(Value::Null);
3263 let rt = Runtime::new().unwrap();
3264 let err = rt
3265 .block_on(engine.apply_jump(
3266 &jump_ctx("flow.source"),
3267 &mut state,
3268 JumpControl {
3269 flow: "flow.target".into(),
3270 node: Some("node-missing".into()),
3271 payload: json!({ "ok": true }),
3272 hints: Value::Null,
3273 max_redirects: None,
3274 reason: None,
3275 },
3276 ))
3277 .unwrap_err();
3278 assert!(
3279 err.to_string().contains("unknown_node"),
3280 "unexpected error: {err}"
3281 );
3282 }
3283
3284 #[test]
3285 fn apply_jump_uses_default_start_fallback() {
3286 let engine = jump_test_engine();
3287 let mut state = ExecutionState::new(Value::Null);
3288 let rt = Runtime::new().unwrap();
3289 let target = rt
3290 .block_on(engine.apply_jump(
3291 &jump_ctx("flow.source"),
3292 &mut state,
3293 JumpControl {
3294 flow: "flow.target".into(),
3295 node: None,
3296 payload: json!({ "k": "v" }),
3297 hints: Value::Null,
3298 max_redirects: None,
3299 reason: None,
3300 },
3301 ))
3302 .expect("jump target");
3303 assert_eq!(target.flow_id, "flow.target");
3304 assert_eq!(target.node_id.as_str(), "node-a");
3305 }
3306
3307 #[test]
3308 fn apply_jump_redirect_limit_enforced() {
3309 let engine = jump_test_engine();
3310 let mut state = ExecutionState::new(Value::Null);
3311 state.redirect_count = 3;
3312 let rt = Runtime::new().unwrap();
3313 let err = rt
3314 .block_on(engine.apply_jump(
3315 &jump_ctx("flow.source"),
3316 &mut state,
3317 JumpControl {
3318 flow: "flow.target".into(),
3319 node: None,
3320 payload: json!({ "k": "v" }),
3321 hints: Value::Null,
3322 max_redirects: Some(3),
3323 reason: None,
3324 },
3325 ))
3326 .unwrap_err();
3327 assert_eq!(err.to_string(), "redirect_limit");
3328 }
3329
3330 #[test]
3337 fn evaluate_custom_routing_waits_when_conditional_falls_through() {
3338 let raw_routing = json!([
3339 { "condition": "response.action == \"go\"", "to": "next" },
3340 { "out": true }
3341 ]);
3342 let flow_ir = HostFlow {
3343 id: "flow.test".to_string(),
3344 start: None,
3345 nodes: IndexMap::new(),
3346 slot_schema: None,
3347 };
3348 let current_node = NodeId::from_str("current").unwrap();
3349 let output = NodeOutput::new(Value::Null);
3350
3351 let mut state_empty = ExecutionState::new(json!({ "metadata": { "action": "" } }));
3353 state_empty.entry = json!({ "metadata": { "action": "" } });
3354 let decision_empty =
3355 evaluate_custom_routing(&raw_routing, &output, &state_empty, &flow_ir, ¤t_node);
3356 assert!(
3357 matches!(decision_empty, CustomRoutingDecision::Wait),
3358 "expected Wait on conditional fall-through, got {decision_empty:?}"
3359 );
3360
3361 let mut state_go = ExecutionState::new(json!({ "metadata": { "action": "go" } }));
3363 state_go.entry = json!({ "metadata": { "action": "go" } });
3364 let decision_go =
3365 evaluate_custom_routing(&raw_routing, &output, &state_go, &flow_ir, ¤t_node);
3366 match decision_go {
3367 CustomRoutingDecision::Next(nid) => assert_eq!(nid.as_str(), "next"),
3368 other => panic!("expected Next(\"next\"), got {other:?}"),
3369 }
3370 }
3371
3372 #[test]
3373 fn node_output_with_error_marks_ok_false_and_stashes_in_meta() {
3374 let err: Box<dyn std::error::Error + 'static> =
3375 Box::<dyn std::error::Error + 'static>::from("weatherapi returned 401 Unauthorized");
3376 let out = NodeOutput::with_error("call_weather", err.as_ref());
3377 assert!(!out.ok);
3378 assert_eq!(out.payload, Value::Null);
3379 assert_eq!(out.meta["error"]["kind"], "flow_node_failed");
3380 assert_eq!(out.meta["error"]["node_id"], "call_weather");
3381 assert_eq!(
3382 out.meta["error"]["message"],
3383 "weatherapi returned 401 Unauthorized"
3384 );
3385 }
3386
3387 #[test]
3388 fn lift_first_node_error_promotes_node_meta_to_output_metadata() {
3389 let mut nodes: HashMap<String, NodeOutput> = HashMap::new();
3394 let err: Box<dyn std::error::Error + 'static> =
3395 Box::<dyn std::error::Error + 'static>::from("weatherapi returned 401 Unauthorized");
3396 nodes.insert(
3397 "call_weather".to_string(),
3398 NodeOutput::with_error("call_weather", err.as_ref()),
3399 );
3400 nodes.insert(
3401 "render_current_card".to_string(),
3402 NodeOutput::new(json!({ "text": "message" })),
3403 );
3404
3405 let final_output = json!({ "text": "message" });
3406 let enriched = lift_first_node_error_from_nodes(final_output, &nodes);
3407 assert_eq!(
3408 enriched["metadata"]["error_kind"], "flow_node_failed",
3409 "first failing node's kind must be lifted"
3410 );
3411 assert_eq!(
3412 enriched["metadata"]["error_message"],
3413 "weatherapi returned 401 Unauthorized"
3414 );
3415 assert_eq!(enriched["metadata"]["node_id"], "call_weather");
3416 assert_eq!(enriched["text"], "message");
3419 }
3420
3421 #[test]
3422 fn lift_first_node_error_is_noop_when_all_nodes_ok() {
3423 let mut nodes: HashMap<String, NodeOutput> = HashMap::new();
3424 nodes.insert(
3425 "ok_node".to_string(),
3426 NodeOutput::new(json!({ "text": "all good" })),
3427 );
3428 let output = json!({ "text": "all good" });
3429 let lifted = lift_first_node_error_from_nodes(output.clone(), &nodes);
3430 assert_eq!(lifted, output);
3431 }
3432
3433 #[tokio::test]
3434 async fn execute_user_facing_flow_failure_returns_completed_with_error_envelope() {
3435 let flow_id_str = "broken.flow";
3440 let pack_id_str = "test-pack";
3441 let host_flow = host_flow_for_test(flow_id_str, &["only-node"], Some("does-not-exist"));
3442 let engine = FlowEngine {
3443 packs: Vec::new(),
3444 flows: Vec::new(),
3445 flow_sources: HashMap::new(),
3446 flow_cache: RwLock::new(HashMap::from([(
3447 FlowKey {
3448 pack_id: pack_id_str.to_string(),
3449 flow_id: flow_id_str.to_string(),
3450 },
3451 host_flow,
3452 )])),
3453 default_env: "local".to_string(),
3454 validation: ValidationConfig {
3455 mode: ValidationMode::Off,
3456 },
3457 cross_pack_resolver: None,
3458 rollout_ids: RolloutIds::default(),
3459 };
3460 let ctx = FlowContext {
3461 tenant: "demo",
3462 pack_id: pack_id_str,
3463 flow_id: flow_id_str,
3464 node_id: None,
3465 tool: None,
3466 action: None,
3467 session_id: Some("conv-1"),
3468 provider_id: None,
3469 retry_config: RetryConfig {
3470 max_attempts: 1,
3471 base_delay_ms: 1,
3472 },
3473 attempt: 1,
3474 observer: None,
3475 mocks: None,
3476 };
3477 let result = engine
3478 .execute(ctx, Value::Null)
3479 .await
3480 .expect("must not propagate Err");
3481 assert!(matches!(result.status, FlowStatus::Completed));
3482 assert_eq!(
3483 result.output["metadata"]["error_kind"],
3484 "flow_execution_failed"
3485 );
3486 let msg = result.output["metadata"]["error_message"]
3487 .as_str()
3488 .unwrap_or("");
3489 assert!(!msg.is_empty(), "error_message must be populated");
3490 assert_eq!(result.output["metadata"]["flow_id"], "broken.flow");
3491 }
3492
3493 #[test]
3494 fn mcp_tool_error_recognises_generator_error_shape() {
3495 let value = json!({
3498 "error": {
3499 "code": "tool_error",
3500 "message": "API request returned status 401",
3501 "status": 401
3502 }
3503 });
3504 let (code, message) = mcp_tool_error(&value).expect("must detect MCP error shape");
3505 assert_eq!(code, "tool_error");
3506 assert!(message.contains("API request returned status 401"));
3507 assert!(message.contains("(status 401)"));
3508 }
3509
3510 #[test]
3511 fn mcp_tool_error_skips_success_responses() {
3512 let value = json!({ "result": { "current": { "temp_c": 19.0 } } });
3514 assert!(mcp_tool_error(&value).is_none());
3515 }
3516
3517 #[test]
3518 fn mcp_tool_error_skips_non_object_and_unrelated_shapes() {
3519 assert!(mcp_tool_error(&Value::Null).is_none());
3520 assert!(mcp_tool_error(&json!({"unrelated": true})).is_none());
3521 assert!(mcp_tool_error(&json!({"error": "oops"})).is_none());
3523 }
3524
3525 #[tokio::test]
3526 async fn execute_non_user_facing_flow_failure_still_propagates() {
3527 let flow_id_str = "broken.flow";
3530 let pack_id_str = "test-pack";
3531 let host_flow = host_flow_for_test(flow_id_str, &["only-node"], Some("does-not-exist"));
3532 let engine = FlowEngine {
3533 packs: Vec::new(),
3534 flows: Vec::new(),
3535 flow_sources: HashMap::new(),
3536 flow_cache: RwLock::new(HashMap::from([(
3537 FlowKey {
3538 pack_id: pack_id_str.to_string(),
3539 flow_id: flow_id_str.to_string(),
3540 },
3541 host_flow,
3542 )])),
3543 default_env: "local".to_string(),
3544 validation: ValidationConfig {
3545 mode: ValidationMode::Off,
3546 },
3547 cross_pack_resolver: None,
3548 rollout_ids: RolloutIds::default(),
3549 };
3550 let ctx = FlowContext {
3551 tenant: "demo",
3552 pack_id: pack_id_str,
3553 flow_id: flow_id_str,
3554 node_id: None,
3555 tool: None,
3556 action: None,
3557 session_id: None,
3558 provider_id: None,
3559 retry_config: RetryConfig {
3560 max_attempts: 1,
3561 base_delay_ms: 1,
3562 },
3563 attempt: 1,
3564 observer: None,
3565 mocks: None,
3566 };
3567 let result = engine.execute(ctx, Value::Null).await;
3568 assert!(result.is_err(), "non-user-facing flow must propagate Err");
3569 }
3570
3571 #[test]
3574 fn host_flow_extracts_slot_schema_from_metadata_extra() {
3575 use greentic_types::FlowMetadata;
3576 use std::collections::BTreeSet;
3577
3578 let schema = json!([
3579 {"name": "counterparty", "slot_type": "string", "required": true},
3580 {"name": "due_date", "slot_type": "date", "required": true}
3581 ]);
3582 let flow = Flow {
3583 schema_version: "flow-v1".into(),
3584 id: FlowId::from_str("test.flow").unwrap(),
3585 kind: FlowKind::Messaging,
3586 entrypoints: BTreeMap::new(),
3587 nodes: IndexMap::default(),
3588 metadata: FlowMetadata {
3589 title: None,
3590 description: None,
3591 tags: BTreeSet::new(),
3592 extra: json!({(SLOT_SCHEMA_METADATA_KEY): schema}),
3593 },
3594 };
3595 let host = HostFlow::from(flow);
3596 assert_eq!(
3597 host.slot_schema.as_ref(),
3598 Some(&schema),
3599 "HostFlow must extract slot_schema from metadata.extra"
3600 );
3601 }
3602
3603 #[test]
3604 fn host_flow_slot_schema_is_none_when_absent() {
3605 let flow = Flow {
3606 schema_version: "flow-v1".into(),
3607 id: FlowId::from_str("test.flow").unwrap(),
3608 kind: FlowKind::Messaging,
3609 entrypoints: BTreeMap::new(),
3610 nodes: IndexMap::default(),
3611 metadata: Default::default(),
3612 };
3613 let host = HostFlow::from(flow);
3614 assert!(
3615 host.slot_schema.is_none(),
3616 "HostFlow.slot_schema must be None when metadata.extra has no greentic.slot_schema"
3617 );
3618 }
3619
3620 #[test]
3621 fn inject_slot_definitions_adds_to_object_input() {
3622 let schema = json!([
3623 {"name": "city", "slot_type": "string"}
3624 ]);
3625 let mut input = json!({"utterance": "hello"});
3626 inject_slot_definitions(&mut input, &schema, "f", "n");
3627 assert_eq!(
3628 input,
3629 json!({"utterance": "hello", "slot_definitions": schema}),
3630 "slot_definitions must be injected into existing object"
3631 );
3632 }
3633
3634 #[test]
3635 fn inject_slot_definitions_wraps_null_input() {
3636 let schema = json!([{"name": "x", "slot_type": "string"}]);
3637 let mut input = Value::Null;
3638 inject_slot_definitions(&mut input, &schema, "f", "n");
3639 assert_eq!(
3640 input,
3641 json!({"slot_definitions": schema}),
3642 "null input must become an object with slot_definitions"
3643 );
3644 }
3645
3646 #[test]
3647 fn inject_slot_definitions_preserves_explicit_inline() {
3648 let flow_schema = json!([{"name": "city", "slot_type": "string"}]);
3649 let inline_defs = json!([{"name": "country", "slot_type": "string"}]);
3650 let mut input = json!({
3651 "utterance": "hello",
3652 "slot_definitions": inline_defs
3653 });
3654 inject_slot_definitions(&mut input, &flow_schema, "f", "n");
3655 assert_eq!(
3656 input["slot_definitions"], inline_defs,
3657 "explicit inline slot_definitions must not be overwritten"
3658 );
3659 }
3660
3661 #[test]
3662 fn inject_slot_definitions_skips_non_object_input() {
3663 let schema = json!([{"name": "x", "slot_type": "string"}]);
3664 let mut input = json!("a string");
3665 inject_slot_definitions(&mut input, &schema, "f", "n");
3666 assert_eq!(
3667 input,
3668 json!("a string"),
3669 "non-object input must be left unchanged"
3670 );
3671 }
3672
3673 fn make_flow_doc_for_test(
3674 id: &str,
3675 node_name: &str,
3676 component: &str,
3677 slot_schema: Option<Value>,
3678 ) -> greentic_flow::model::FlowDoc {
3679 use greentic_flow::model::{FlowDoc, NodeDoc};
3680
3681 let mut nodes = IndexMap::new();
3682 nodes.insert(
3683 node_name.to_string(),
3684 NodeDoc {
3685 raw: {
3686 let mut m = IndexMap::new();
3687 m.insert(
3688 "component.exec".to_string(),
3689 json!({ "component": component }),
3690 );
3691 m
3692 },
3693 routing: json!([{ "out": true }]),
3694 ..Default::default()
3695 },
3696 );
3697
3698 FlowDoc {
3699 id: id.into(),
3700 title: None,
3701 description: None,
3702 flow_type: "messaging".into(),
3703 start: Some(node_name.into()),
3704 parameters: json!({}),
3705 tags: Vec::new(),
3706 schema_version: None,
3707 entrypoints: IndexMap::new(),
3708 meta: None,
3709 slot_schema,
3710 nodes,
3711 }
3712 }
3713
3714 #[test]
3720 fn compile_flow_round_trips_slot_schema_into_host_flow() {
3721 let slot_defs = json!([
3722 { "name": "counterparty", "slot_type": "string", "required": true,
3723 "pattern": ".+" },
3724 { "name": "due_date", "slot_type": "date", "required": true,
3725 "pattern": "\\d{4}-\\d{2}-\\d{2}" }
3726 ]);
3727 let doc = make_flow_doc_for_test(
3728 "slot-test",
3729 "extractor",
3730 "slot-extractor",
3731 Some(slot_defs.clone()),
3732 );
3733
3734 let flow = greentic_flow::compile_flow(doc).expect("compile_flow must succeed");
3735 assert_eq!(
3736 flow.metadata.extra.get(SLOT_SCHEMA_METADATA_KEY),
3737 Some(&slot_defs),
3738 "compile_flow must forward slot_schema into metadata.extra"
3739 );
3740
3741 let host = HostFlow::from(flow);
3742 assert_eq!(
3743 host.slot_schema.as_ref(),
3744 Some(&slot_defs),
3745 "HostFlow.slot_schema must survive the compile_flow -> HostFlow round-trip"
3746 );
3747 }
3748
3749 #[test]
3753 fn compile_flow_without_slot_schema_leaves_host_flow_none() {
3754 let doc = make_flow_doc_for_test("no-slots", "echo", "echo", None);
3755
3756 let flow = greentic_flow::compile_flow(doc).expect("compile_flow must succeed");
3757 assert!(
3758 flow.metadata.extra.get(SLOT_SCHEMA_METADATA_KEY).is_none(),
3759 "metadata.extra must not contain greentic.slot_schema when FlowDoc.slot_schema is None"
3760 );
3761
3762 let host = HostFlow::from(flow);
3763 assert!(
3764 host.slot_schema.is_none(),
3765 "HostFlow.slot_schema must be None when FlowDoc has no slot_schema"
3766 );
3767 }
3768}
3769
3770use tracing::Instrument;
3771
3772pub struct FlowContext<'a> {
3773 pub tenant: &'a str,
3774 pub pack_id: &'a str,
3775 pub flow_id: &'a str,
3776 pub node_id: Option<&'a str>,
3777 pub tool: Option<&'a str>,
3778 pub action: Option<&'a str>,
3779 pub session_id: Option<&'a str>,
3780 pub provider_id: Option<&'a str>,
3781 pub retry_config: RetryConfig,
3782 pub attempt: u32,
3783 pub observer: Option<&'a dyn ExecutionObserver>,
3784 pub mocks: Option<&'a MockLayer>,
3785}
3786
3787#[derive(Copy, Clone)]
3788pub struct RetryConfig {
3789 pub max_attempts: u32,
3790 pub base_delay_ms: u64,
3791}
3792
3793fn lift_first_node_error_from_nodes(output: Value, nodes: &HashMap<String, NodeOutput>) -> Value {
3808 let Some((node_id, failed)) = nodes.iter().find(|(_, out)| !out.ok) else {
3809 return output;
3810 };
3811 let err_meta = failed.meta.get("error");
3812 let message = err_meta
3813 .and_then(|e| e.get("message"))
3814 .and_then(|v| v.as_str())
3815 .unwrap_or("flow node failed");
3816 let kind = err_meta
3817 .and_then(|e| e.get("kind"))
3818 .and_then(|v| v.as_str())
3819 .unwrap_or("flow_node_failed");
3820
3821 let mut output = match output {
3822 Value::Object(map) => map,
3823 Value::Null => JsonMap::new(),
3824 other => {
3825 let mut wrap = JsonMap::new();
3826 wrap.insert("payload".to_string(), other);
3827 wrap
3828 }
3829 };
3830 let metadata_entry = output
3831 .entry("metadata".to_string())
3832 .or_insert_with(|| Value::Object(JsonMap::new()));
3833 let metadata_map = match metadata_entry {
3834 Value::Object(map) => map,
3835 _ => {
3836 *metadata_entry = Value::Object(JsonMap::new());
3837 metadata_entry.as_object_mut().unwrap()
3838 }
3839 };
3840 metadata_map
3841 .entry("error_kind".to_string())
3842 .or_insert(Value::String(kind.to_string()));
3843 metadata_map
3844 .entry("error_message".to_string())
3845 .or_insert(Value::String(message.to_string()));
3846 metadata_map
3847 .entry("node_id".to_string())
3848 .or_insert(Value::String(node_id.clone()));
3849 Value::Object(output)
3850}
3851
3852fn should_retry(err: &anyhow::Error) -> bool {
3853 let lower = err.to_string().to_lowercase();
3854 lower.contains("transient")
3855 || lower.contains("unavailable")
3856 || lower.contains("internal")
3857 || lower.contains("timeout")
3858}
3859
3860impl From<FlowRetryConfig> for RetryConfig {
3861 fn from(value: FlowRetryConfig) -> Self {
3862 Self {
3863 max_attempts: value.max_attempts.max(1),
3864 base_delay_ms: value.base_delay_ms.max(50),
3865 }
3866 }
3867}