1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
9use anyhow::{Context, Result, anyhow, bail};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use crate::config::{FlowRetryConfig, HostConfig};
18use crate::pack::{FlowDescriptor, PackRuntime};
19use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
20use greentic_types::{Flow, Node, NodeId, Routing};
21
22pub struct FlowEngine {
23 packs: Vec<Arc<PackRuntime>>,
24 flows: Vec<FlowDescriptor>,
25 flow_sources: HashMap<String, usize>,
26 flow_cache: RwLock<HashMap<String, HostFlow>>,
27 default_env: String,
28}
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
31pub struct FlowSnapshot {
32 pub flow_id: String,
33 pub next_node: String,
34 pub state: ExecutionState,
35}
36
37#[derive(Clone, Debug)]
38pub struct FlowWait {
39 pub reason: Option<String>,
40 pub snapshot: FlowSnapshot,
41}
42
43#[derive(Clone, Debug)]
44pub enum FlowStatus {
45 Completed,
46 Waiting(FlowWait),
47}
48
49#[derive(Clone, Debug)]
50pub struct FlowExecution {
51 pub output: Value,
52 pub status: FlowStatus,
53}
54
55#[derive(Clone, Debug)]
56struct HostFlow {
57 id: String,
58 start: Option<NodeId>,
59 nodes: IndexMap<NodeId, HostNode>,
60}
61
62#[derive(Clone, Debug)]
63pub struct HostNode {
64 kind: NodeKind,
65 pub component: String,
67 component_id: String,
68 operation_name: Option<String>,
69 operation_in_mapping: Option<String>,
70 payload_expr: Value,
71 routing: Routing,
72}
73
74#[derive(Clone, Debug)]
75enum NodeKind {
76 Exec { target_component: String },
77 PackComponent { component_ref: String },
78 ProviderInvoke,
79 FlowCall,
80 BuiltinEmit { kind: EmitKind },
81 Wait,
82}
83
84#[derive(Clone, Debug)]
85enum EmitKind {
86 Log,
87 Response,
88 Other(String),
89}
90
91struct ComponentOverrides<'a> {
92 component: Option<&'a str>,
93 operation: Option<&'a str>,
94}
95
96struct ComponentCall {
97 component_ref: String,
98 operation: String,
99 input: Value,
100 config: Value,
101}
102
103impl FlowExecution {
104 fn completed(output: Value) -> Self {
105 Self {
106 output,
107 status: FlowStatus::Completed,
108 }
109 }
110
111 fn waiting(output: Value, wait: FlowWait) -> Self {
112 Self {
113 output,
114 status: FlowStatus::Waiting(wait),
115 }
116 }
117}
118
119impl FlowEngine {
120 pub async fn new(packs: Vec<Arc<PackRuntime>>, _config: Arc<HostConfig>) -> Result<Self> {
121 let mut flow_sources = HashMap::new();
122 let mut descriptors = Vec::new();
123 for (idx, pack) in packs.iter().enumerate() {
124 let flows = pack.list_flows().await?;
125 for flow in flows {
126 tracing::info!(
127 flow_id = %flow.id,
128 flow_type = %flow.flow_type,
129 pack_index = idx,
130 "registered flow"
131 );
132 flow_sources.insert(flow.id.clone(), idx);
133 descriptors.retain(|existing: &FlowDescriptor| existing.id != flow.id);
134 descriptors.push(flow);
135 }
136 }
137
138 let mut flow_map = HashMap::new();
139 for flow in &descriptors {
140 if let Some(&pack_idx) = flow_sources.get(&flow.id) {
141 let pack_clone = Arc::clone(&packs[pack_idx]);
142 let flow_id = flow.id.clone();
143 let task_flow_id = flow_id.clone();
144 match task::spawn_blocking(move || pack_clone.load_flow(&task_flow_id)).await {
145 Ok(Ok(flow)) => {
146 flow_map.insert(flow_id, HostFlow::from(flow));
147 }
148 Ok(Err(err)) => {
149 tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
150 }
151 Err(err) => {
152 tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
153 }
154 }
155 }
156 }
157
158 Ok(Self {
159 packs,
160 flows: descriptors,
161 flow_sources,
162 flow_cache: RwLock::new(flow_map),
163 default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
164 })
165 }
166
167 async fn get_or_load_flow(&self, flow_id: &str) -> Result<HostFlow> {
168 if let Some(flow) = self.flow_cache.read().get(flow_id).cloned() {
169 return Ok(flow);
170 }
171
172 let pack_idx = *self
173 .flow_sources
174 .get(flow_id)
175 .with_context(|| format!("flow {flow_id} not registered"))?;
176 let pack = Arc::clone(&self.packs[pack_idx]);
177 let flow_id_owned = flow_id.to_string();
178 let task_flow_id = flow_id_owned.clone();
179 let flow = task::spawn_blocking(move || pack.load_flow(&task_flow_id))
180 .await
181 .context("failed to join flow metadata task")??;
182 let host_flow = HostFlow::from(flow);
183 self.flow_cache
184 .write()
185 .insert(flow_id_owned.clone(), host_flow.clone());
186 Ok(host_flow)
187 }
188
189 pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
190 let span = tracing::info_span!(
191 "flow.execute",
192 tenant = tracing::field::Empty,
193 flow_id = tracing::field::Empty,
194 node_id = tracing::field::Empty,
195 tool = tracing::field::Empty,
196 action = tracing::field::Empty
197 );
198 annotate_span(
199 &span,
200 &FlowSpanAttributes {
201 tenant: ctx.tenant,
202 flow_id: ctx.flow_id,
203 node_id: ctx.node_id,
204 tool: ctx.tool,
205 action: ctx.action,
206 },
207 );
208 set_flow_context(
209 &self.default_env,
210 ctx.tenant,
211 ctx.flow_id,
212 ctx.node_id,
213 ctx.provider_id,
214 ctx.session_id,
215 );
216 let retry_config = ctx.retry_config;
217 let original_input = input;
218 async move {
219 let mut attempt = 0u32;
220 loop {
221 attempt += 1;
222 match self.execute_once(&ctx, original_input.clone()).await {
223 Ok(value) => return Ok(value),
224 Err(err) => {
225 if attempt >= retry_config.max_attempts || !should_retry(&err) {
226 return Err(err);
227 }
228 let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
229 tracing::warn!(
230 tenant = ctx.tenant,
231 flow_id = ctx.flow_id,
232 attempt,
233 max_attempts = retry_config.max_attempts,
234 delay_ms = delay,
235 error = %err,
236 "transient flow execution failure, backing off"
237 );
238 tokio::time::sleep(Duration::from_millis(delay)).await;
239 }
240 }
241 }
242 }
243 .instrument(span)
244 .await
245 }
246
247 pub async fn resume(
248 &self,
249 ctx: FlowContext<'_>,
250 snapshot: FlowSnapshot,
251 input: Value,
252 ) -> Result<FlowExecution> {
253 if snapshot.flow_id != ctx.flow_id {
254 bail!(
255 "snapshot flow {} does not match requested {}",
256 snapshot.flow_id,
257 ctx.flow_id
258 );
259 }
260 let flow_ir = self.get_or_load_flow(ctx.flow_id).await?;
261 let mut state = snapshot.state;
262 state.replace_input(input);
263 self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
264 .await
265 }
266
267 async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
268 let flow_ir = self.get_or_load_flow(ctx.flow_id).await?;
269 let state = ExecutionState::new(input);
270 self.drive_flow(ctx, flow_ir, state, None).await
271 }
272
273 async fn drive_flow(
274 &self,
275 ctx: &FlowContext<'_>,
276 flow_ir: HostFlow,
277 mut state: ExecutionState,
278 resume_from: Option<String>,
279 ) -> Result<FlowExecution> {
280 let mut current = match resume_from {
281 Some(node) => NodeId::from_str(&node)
282 .with_context(|| format!("invalid resume node id `{node}`"))?,
283 None => flow_ir
284 .start
285 .clone()
286 .or_else(|| flow_ir.nodes.keys().next().cloned())
287 .with_context(|| format!("flow {} has no start node", flow_ir.id))?,
288 };
289
290 loop {
291 let node = flow_ir
292 .nodes
293 .get(¤t)
294 .with_context(|| format!("node {} not found", current.as_str()))?;
295
296 let payload = node.payload_expr.clone();
297 let observed_payload = payload.clone();
298 let node_id = current.clone();
299 let event = NodeEvent {
300 context: ctx,
301 node_id: node_id.as_str(),
302 node,
303 payload: &observed_payload,
304 };
305 if let Some(observer) = ctx.observer {
306 observer.on_node_start(&event);
307 }
308 let DispatchOutcome {
309 output,
310 wait_reason,
311 } = self
312 .dispatch_node(ctx, node_id.as_str(), node, &mut state, payload)
313 .await?;
314
315 state.nodes.insert(node_id.clone().into(), output.clone());
316 if let Some(observer) = ctx.observer {
317 observer.on_node_end(&event, &output.payload);
318 }
319
320 let (next, should_exit) = match &node.routing {
321 Routing::Next { node_id } => (Some(node_id.clone()), false),
322 Routing::End | Routing::Reply => (None, true),
323 Routing::Branch { default, .. } => (default.clone(), default.is_none()),
324 Routing::Custom(raw) => {
325 tracing::warn!(
326 flow_id = %flow_ir.id,
327 node_id = %node_id,
328 routing = ?raw,
329 "unsupported routing; terminating flow"
330 );
331 (None, true)
332 }
333 };
334
335 if let Some(wait_reason) = wait_reason {
336 let resume_target = next.clone().ok_or_else(|| {
337 anyhow!(
338 "session.wait node {} requires a non-empty route",
339 current.as_str()
340 )
341 })?;
342 let mut snapshot_state = state.clone();
343 snapshot_state.clear_egress();
344 let snapshot = FlowSnapshot {
345 flow_id: ctx.flow_id.to_string(),
346 next_node: resume_target.as_str().to_string(),
347 state: snapshot_state,
348 };
349 let output_value = state.clone().finalize_with(None);
350 return Ok(FlowExecution::waiting(
351 output_value,
352 FlowWait {
353 reason: Some(wait_reason),
354 snapshot,
355 },
356 ));
357 }
358
359 if should_exit {
360 return Ok(FlowExecution::completed(
361 state.finalize_with(Some(output.payload.clone())),
362 ));
363 }
364
365 match next {
366 Some(n) => current = n,
367 None => {
368 return Ok(FlowExecution::completed(
369 state.finalize_with(Some(output.payload.clone())),
370 ));
371 }
372 }
373 }
374 }
375
376 async fn dispatch_node(
377 &self,
378 ctx: &FlowContext<'_>,
379 node_id: &str,
380 node: &HostNode,
381 state: &mut ExecutionState,
382 payload: Value,
383 ) -> Result<DispatchOutcome> {
384 match &node.kind {
385 NodeKind::Exec { target_component } => self
386 .execute_component_exec(
387 ctx,
388 node_id,
389 node,
390 state,
391 payload,
392 ComponentOverrides {
393 component: Some(target_component.as_str()),
394 operation: node.operation_name.as_deref(),
395 },
396 )
397 .await
398 .map(DispatchOutcome::complete),
399 NodeKind::PackComponent { component_ref } => self
400 .execute_component_call(ctx, node_id, node, state, payload, component_ref.as_str())
401 .await
402 .map(DispatchOutcome::complete),
403 NodeKind::FlowCall => self
404 .execute_flow_call(ctx, payload)
405 .await
406 .map(DispatchOutcome::complete),
407 NodeKind::ProviderInvoke => self
408 .execute_provider_invoke(ctx, node_id, state, payload)
409 .await
410 .map(DispatchOutcome::complete),
411 NodeKind::BuiltinEmit { kind } => {
412 match kind {
413 EmitKind::Log | EmitKind::Response => {}
414 EmitKind::Other(component) => {
415 tracing::debug!(%component, "handling emit.* as builtin");
416 }
417 }
418 state.push_egress(payload.clone());
419 Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
420 }
421 NodeKind::Wait => {
422 let reason = extract_wait_reason(&payload);
423 Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
424 }
425 }
426 }
427
428 async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
429 #[derive(Deserialize)]
430 struct FlowCallPayload {
431 #[serde(alias = "flow")]
432 flow_id: String,
433 #[serde(default)]
434 input: Value,
435 }
436
437 let call: FlowCallPayload =
438 serde_json::from_value(payload).context("invalid payload for flow.call node")?;
439 if call.flow_id.trim().is_empty() {
440 bail!("flow.call requires a non-empty flow_id");
441 }
442
443 let sub_input = if call.input.is_null() {
444 Value::Null
445 } else {
446 call.input
447 };
448
449 let flow_id_owned = call.flow_id;
450 let action = "flow.call";
451 let sub_ctx = FlowContext {
452 tenant: ctx.tenant,
453 flow_id: flow_id_owned.as_str(),
454 node_id: None,
455 tool: ctx.tool,
456 action: Some(action),
457 session_id: ctx.session_id,
458 provider_id: ctx.provider_id,
459 retry_config: ctx.retry_config,
460 observer: ctx.observer,
461 mocks: ctx.mocks,
462 };
463
464 let execution = Box::pin(self.execute(sub_ctx, sub_input))
465 .await
466 .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
467 match execution.status {
468 FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
469 FlowStatus::Waiting(wait) => bail!(
470 "flow.call cannot pause (flow {} waiting {:?})",
471 flow_id_owned,
472 wait.reason
473 ),
474 }
475 }
476
477 async fn execute_component_exec(
478 &self,
479 ctx: &FlowContext<'_>,
480 node_id: &str,
481 node: &HostNode,
482 state: &ExecutionState,
483 payload: Value,
484 overrides: ComponentOverrides<'_>,
485 ) -> Result<NodeOutput> {
486 #[derive(Deserialize)]
487 struct ComponentPayload {
488 #[serde(default, alias = "component_ref", alias = "component")]
489 component: Option<String>,
490 #[serde(alias = "op")]
491 operation: Option<String>,
492 #[serde(default)]
493 input: Value,
494 #[serde(default)]
495 config: Value,
496 }
497
498 let payload: ComponentPayload =
499 serde_json::from_value(payload).context("invalid payload for component.exec")?;
500 let component_ref = overrides
501 .component
502 .map(str::to_string)
503 .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
504 .with_context(|| "component.exec requires a component_ref")?;
505 let operation = resolve_component_operation(
506 node_id,
507 node.component_id.as_str(),
508 payload.operation,
509 overrides.operation,
510 node.operation_in_mapping.as_deref(),
511 )?;
512 let call = ComponentCall {
513 component_ref,
514 operation,
515 input: payload.input,
516 config: payload.config,
517 };
518
519 self.invoke_component_call(ctx, node_id, state, call).await
520 }
521
522 async fn execute_component_call(
523 &self,
524 ctx: &FlowContext<'_>,
525 node_id: &str,
526 node: &HostNode,
527 state: &ExecutionState,
528 payload: Value,
529 component_ref: &str,
530 ) -> Result<NodeOutput> {
531 let payload_operation = extract_operation_from_mapping(&payload);
532 let (input, config) = split_operation_payload(payload);
533 let operation = resolve_component_operation(
534 node_id,
535 node.component_id.as_str(),
536 payload_operation,
537 node.operation_name.as_deref(),
538 node.operation_in_mapping.as_deref(),
539 )?;
540 let call = ComponentCall {
541 component_ref: component_ref.to_string(),
542 operation,
543 input,
544 config,
545 };
546 self.invoke_component_call(ctx, node_id, state, call).await
547 }
548
549 async fn invoke_component_call(
550 &self,
551 ctx: &FlowContext<'_>,
552 node_id: &str,
553 state: &ExecutionState,
554 mut call: ComponentCall,
555 ) -> Result<NodeOutput> {
556 if let Value::Object(ref mut map) = call.input {
557 map.entry("state".to_string())
558 .or_insert_with(|| state.context());
559 }
560 let input_json = serde_json::to_string(&call.input)?;
561 let config_json = if call.config.is_null() {
562 None
563 } else {
564 Some(serde_json::to_string(&call.config)?)
565 };
566
567 let pack_idx = *self
568 .flow_sources
569 .get(ctx.flow_id)
570 .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
571 let pack = Arc::clone(&self.packs[pack_idx]);
572 let exec_ctx = component_exec_ctx(ctx, node_id);
573 let value = pack
574 .invoke_component(
575 call.component_ref.as_str(),
576 exec_ctx,
577 call.operation.as_str(),
578 config_json,
579 input_json,
580 )
581 .await?;
582
583 Ok(NodeOutput::new(value))
584 }
585
586 async fn execute_provider_invoke(
587 &self,
588 ctx: &FlowContext<'_>,
589 node_id: &str,
590 state: &ExecutionState,
591 payload: Value,
592 ) -> Result<NodeOutput> {
593 #[derive(Deserialize)]
594 struct ProviderPayload {
595 #[serde(default)]
596 provider_id: Option<String>,
597 #[serde(default)]
598 provider_type: Option<String>,
599 #[serde(default, alias = "operation")]
600 op: Option<String>,
601 #[serde(default)]
602 input: Value,
603 #[serde(default)]
604 in_map: Value,
605 #[serde(default)]
606 out_map: Value,
607 #[serde(default)]
608 err_map: Value,
609 }
610
611 let payload: ProviderPayload =
612 serde_json::from_value(payload).context("invalid payload for provider.invoke")?;
613 let op = payload
614 .op
615 .as_deref()
616 .filter(|v| !v.trim().is_empty())
617 .with_context(|| "provider.invoke requires an op")?
618 .to_string();
619
620 let mut input_value = if !payload.in_map.is_null() {
621 let ctx_value = mapping_ctx(payload.input.clone(), state);
622 apply_mapping(&payload.in_map, &ctx_value)
623 .context("failed to evaluate provider.invoke in_map")?
624 } else if !payload.input.is_null() {
625 payload.input
626 } else {
627 Value::Null
628 };
629
630 if let Value::Object(ref mut map) = input_value {
631 map.entry("state".to_string())
632 .or_insert_with(|| state.context());
633 }
634 let input_json = serde_json::to_vec(&input_value)?;
635
636 let pack_idx = *self
637 .flow_sources
638 .get(ctx.flow_id)
639 .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
640 let pack = Arc::clone(&self.packs[pack_idx]);
641 let binding = pack.resolve_provider(
642 payload.provider_id.as_deref(),
643 payload.provider_type.as_deref(),
644 )?;
645 let exec_ctx = component_exec_ctx(ctx, node_id);
646 let result = pack
647 .invoke_provider(&binding, exec_ctx, &op, input_json)
648 .await?;
649
650 let output = if payload.out_map.is_null() {
651 result
652 } else {
653 let ctx_value = mapping_ctx(result, state);
654 apply_mapping(&payload.out_map, &ctx_value)
655 .context("failed to evaluate provider.invoke out_map")?
656 };
657 let _ = payload.err_map;
658 Ok(NodeOutput::new(output))
659 }
660
661 pub fn flows(&self) -> &[FlowDescriptor] {
662 &self.flows
663 }
664
665 pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
666 self.flows
667 .iter()
668 .find(|descriptor| descriptor.flow_type == flow_type)
669 }
670
671 pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
672 self.flows
673 .iter()
674 .find(|descriptor| descriptor.id == flow_id)
675 }
676}
677
678pub trait ExecutionObserver: Send + Sync {
679 fn on_node_start(&self, event: &NodeEvent<'_>);
680 fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
681 fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
682}
683
684pub struct NodeEvent<'a> {
685 pub context: &'a FlowContext<'a>,
686 pub node_id: &'a str,
687 pub node: &'a HostNode,
688 pub payload: &'a Value,
689}
690
691#[derive(Clone, Debug, Serialize, Deserialize)]
692pub struct ExecutionState {
693 input: Value,
694 nodes: HashMap<String, NodeOutput>,
695 egress: Vec<Value>,
696}
697
698impl ExecutionState {
699 fn new(input: Value) -> Self {
700 Self {
701 input,
702 nodes: HashMap::new(),
703 egress: Vec::new(),
704 }
705 }
706
707 fn context(&self) -> Value {
708 let mut nodes = JsonMap::new();
709 for (id, output) in &self.nodes {
710 nodes.insert(
711 id.clone(),
712 json!({
713 "ok": output.ok,
714 "payload": output.payload.clone(),
715 "meta": output.meta.clone(),
716 }),
717 );
718 }
719 json!({
720 "input": self.input.clone(),
721 "nodes": nodes,
722 })
723 }
724 fn push_egress(&mut self, payload: Value) {
725 self.egress.push(payload);
726 }
727
728 fn replace_input(&mut self, input: Value) {
729 self.input = input;
730 }
731
732 fn clear_egress(&mut self) {
733 self.egress.clear();
734 }
735
736 fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
737 if self.egress.is_empty() {
738 return final_payload.unwrap_or(Value::Null);
739 }
740 let mut emitted = std::mem::take(&mut self.egress);
741 if let Some(value) = final_payload {
742 match value {
743 Value::Null => {}
744 Value::Array(items) => emitted.extend(items),
745 other => emitted.push(other),
746 }
747 }
748 Value::Array(emitted)
749 }
750}
751
752#[derive(Clone, Debug, Serialize, Deserialize)]
753struct NodeOutput {
754 ok: bool,
755 payload: Value,
756 meta: Value,
757}
758
759impl NodeOutput {
760 fn new(payload: Value) -> Self {
761 Self {
762 ok: true,
763 payload,
764 meta: Value::Null,
765 }
766 }
767}
768
769struct DispatchOutcome {
770 output: NodeOutput,
771 wait_reason: Option<String>,
772}
773
774impl DispatchOutcome {
775 fn complete(output: NodeOutput) -> Self {
776 Self {
777 output,
778 wait_reason: None,
779 }
780 }
781
782 fn wait(output: NodeOutput, reason: Option<String>) -> Self {
783 Self {
784 output,
785 wait_reason: reason,
786 }
787 }
788}
789
790fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
791 ComponentExecCtx {
792 tenant: ComponentTenantCtx {
793 tenant: ctx.tenant.to_string(),
794 team: None,
795 user: ctx.provider_id.map(str::to_string),
796 trace_id: None,
797 correlation_id: ctx.session_id.map(str::to_string),
798 deadline_unix_ms: None,
799 attempt: 1,
800 idempotency_key: ctx.session_id.map(str::to_string),
801 },
802 flow_id: ctx.flow_id.to_string(),
803 node_id: Some(node_id.to_string()),
804 }
805}
806
807fn extract_wait_reason(payload: &Value) -> Option<String> {
808 match payload {
809 Value::String(s) => Some(s.clone()),
810 Value::Object(map) => map
811 .get("reason")
812 .and_then(Value::as_str)
813 .map(|value| value.to_string()),
814 _ => None,
815 }
816}
817
818fn mapping_ctx(root: Value, state: &ExecutionState) -> Value {
819 json!({
820 "input": root.clone(),
821 "result": root,
822 "state": state.context(),
823 })
824}
825
826fn apply_mapping(template: &Value, ctx: &Value) -> Result<Value> {
827 match template {
828 Value::String(path) if path.starts_with('/') => ctx
829 .pointer(path)
830 .cloned()
831 .ok_or_else(|| anyhow!("mapping path `{path}` not found")),
832 Value::Array(items) => {
833 let mut mapped = Vec::with_capacity(items.len());
834 for item in items {
835 mapped.push(apply_mapping(item, ctx)?);
836 }
837 Ok(Value::Array(mapped))
838 }
839 Value::Object(map) => {
840 let mut out = serde_json::Map::new();
841 for (key, value) in map {
842 out.insert(key.clone(), apply_mapping(value, ctx)?);
843 }
844 Ok(Value::Object(out))
845 }
846 other => Ok(other.clone()),
847 }
848}
849
850impl From<Flow> for HostFlow {
851 fn from(value: Flow) -> Self {
852 let mut nodes = IndexMap::new();
853 for (id, node) in value.nodes {
854 nodes.insert(id.clone(), HostNode::from(node));
855 }
856 let start = value
857 .entrypoints
858 .get("default")
859 .and_then(Value::as_str)
860 .and_then(|id| NodeId::from_str(id).ok())
861 .or_else(|| nodes.keys().next().cloned());
862 Self {
863 id: value.id.as_str().to_string(),
864 start,
865 nodes,
866 }
867 }
868}
869
870impl From<Node> for HostNode {
871 fn from(node: Node) -> Self {
872 let component_ref = node.component.id.as_str().to_string();
873 let raw_operation = node.component.operation.clone();
874 let operation_in_mapping = extract_operation_from_mapping(&node.input.mapping);
875 let operation_is_component_exec = raw_operation.as_deref() == Some("component.exec");
876 let operation_is_emit = raw_operation
877 .as_deref()
878 .map(|op| op.starts_with("emit."))
879 .unwrap_or(false);
880 let is_component_exec = component_ref == "component.exec" || operation_is_component_exec;
881
882 let kind = if is_component_exec {
883 let target = if component_ref == "component.exec" {
884 if let Some(op) = raw_operation
885 .as_deref()
886 .filter(|op| op.starts_with("emit."))
887 {
888 op.to_string()
889 } else {
890 extract_target_component(&node.input.mapping)
891 .unwrap_or_else(|| "component.exec".to_string())
892 }
893 } else {
894 extract_target_component(&node.input.mapping)
895 .unwrap_or_else(|| component_ref.clone())
896 };
897 if target.starts_with("emit.") {
898 NodeKind::BuiltinEmit {
899 kind: emit_kind_from_ref(&target),
900 }
901 } else {
902 NodeKind::Exec {
903 target_component: target,
904 }
905 }
906 } else if operation_is_emit {
907 NodeKind::BuiltinEmit {
908 kind: emit_kind_from_ref(raw_operation.as_deref().unwrap_or("emit.log")),
909 }
910 } else {
911 match component_ref.as_str() {
912 "flow.call" => NodeKind::FlowCall,
913 "provider.invoke" => NodeKind::ProviderInvoke,
914 "session.wait" => NodeKind::Wait,
915 comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
916 kind: emit_kind_from_ref(comp),
917 },
918 other => NodeKind::PackComponent {
919 component_ref: other.to_string(),
920 },
921 }
922 };
923 let component_label = match &kind {
924 NodeKind::Exec { .. } => "component.exec".to_string(),
925 NodeKind::PackComponent { component_ref } => component_ref.clone(),
926 NodeKind::ProviderInvoke => "provider.invoke".to_string(),
927 NodeKind::FlowCall => "flow.call".to_string(),
928 NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
929 NodeKind::Wait => "session.wait".to_string(),
930 };
931 let operation_name = if is_component_exec && operation_is_component_exec {
932 None
933 } else {
934 raw_operation.clone()
935 };
936 let payload_expr = match kind {
937 NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.input.mapping),
938 _ => node.input.mapping.clone(),
939 };
940 Self {
941 kind,
942 component: component_label,
943 component_id: if is_component_exec {
944 "component.exec".to_string()
945 } else {
946 component_ref
947 },
948 operation_name,
949 operation_in_mapping,
950 payload_expr,
951 routing: node.routing,
952 }
953 }
954}
955
956fn extract_target_component(payload: &Value) -> Option<String> {
957 match payload {
958 Value::Object(map) => map
959 .get("component")
960 .or_else(|| map.get("component_ref"))
961 .and_then(Value::as_str)
962 .map(|s| s.to_string()),
963 _ => None,
964 }
965}
966
967fn extract_operation_from_mapping(payload: &Value) -> Option<String> {
968 match payload {
969 Value::Object(map) => map
970 .get("operation")
971 .or_else(|| map.get("op"))
972 .and_then(Value::as_str)
973 .map(str::trim)
974 .filter(|value| !value.is_empty())
975 .map(|value| value.to_string()),
976 _ => None,
977 }
978}
979
980fn extract_emit_payload(payload: &Value) -> Value {
981 if let Value::Object(map) = payload {
982 if let Some(input) = map.get("input") {
983 return input.clone();
984 }
985 if let Some(inner) = map.get("payload") {
986 return inner.clone();
987 }
988 }
989 payload.clone()
990}
991
992fn split_operation_payload(payload: Value) -> (Value, Value) {
993 if let Value::Object(mut map) = payload.clone()
994 && map.contains_key("input")
995 {
996 let input = map.remove("input").unwrap_or(Value::Null);
997 let config = map.remove("config").unwrap_or(Value::Null);
998 let legacy_only = map.keys().all(|key| {
999 matches!(
1000 key.as_str(),
1001 "operation" | "op" | "component" | "component_ref"
1002 )
1003 });
1004 if legacy_only {
1005 return (input, config);
1006 }
1007 }
1008 (payload, Value::Null)
1009}
1010
1011fn resolve_component_operation(
1012 node_id: &str,
1013 component_label: &str,
1014 payload_operation: Option<String>,
1015 operation_override: Option<&str>,
1016 operation_in_mapping: Option<&str>,
1017) -> Result<String> {
1018 if let Some(op) = operation_override
1019 .map(str::trim)
1020 .filter(|value| !value.is_empty())
1021 {
1022 return Ok(op.to_string());
1023 }
1024
1025 if let Some(op) = payload_operation
1026 .as_deref()
1027 .map(str::trim)
1028 .filter(|value| !value.is_empty())
1029 {
1030 return Ok(op.to_string());
1031 }
1032
1033 let mut message = format!(
1034 "missing operation for node `{}` (component `{}`); expected node.component.operation to be set",
1035 node_id, component_label,
1036 );
1037 if let Some(found) = operation_in_mapping {
1038 message.push_str(&format!(
1039 ". Found operation in input.mapping (`{}`) but this is not used; pack compiler must preserve node.component.operation.",
1040 found
1041 ));
1042 }
1043 bail!(message);
1044}
1045
1046fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
1047 match component_ref {
1048 "emit.log" => EmitKind::Log,
1049 "emit.response" => EmitKind::Response,
1050 other => EmitKind::Other(other.to_string()),
1051 }
1052}
1053
1054fn emit_ref_from_kind(kind: &EmitKind) -> String {
1055 match kind {
1056 EmitKind::Log => "emit.log".to_string(),
1057 EmitKind::Response => "emit.response".to_string(),
1058 EmitKind::Other(other) => other.clone(),
1059 }
1060}
1061
1062#[cfg(test)]
1063mod tests {
1064 use super::*;
1065 use greentic_types::{
1066 Flow, FlowComponentRef, FlowId, FlowKind, InputMapping, Node, NodeId, OutputMapping,
1067 Routing, TelemetryHints,
1068 };
1069 use serde_json::json;
1070 use std::collections::BTreeMap;
1071 use std::str::FromStr;
1072 use std::sync::Mutex;
1073 use tokio::runtime::Runtime;
1074
1075 fn minimal_engine() -> FlowEngine {
1076 FlowEngine {
1077 packs: Vec::new(),
1078 flows: Vec::new(),
1079 flow_sources: HashMap::new(),
1080 flow_cache: RwLock::new(HashMap::new()),
1081 default_env: "local".to_string(),
1082 }
1083 }
1084
1085 #[test]
1086 fn templating_renders_with_partials_and_data() {
1087 let mut state = ExecutionState::new(json!({ "city": "London" }));
1088 state.nodes.insert(
1089 "forecast".to_string(),
1090 NodeOutput::new(json!({ "temp": "20C" })),
1091 );
1092
1093 let ctx = state.context();
1095 assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
1096 }
1097
1098 #[test]
1099 fn finalize_wraps_emitted_payloads() {
1100 let mut state = ExecutionState::new(json!({}));
1101 state.push_egress(json!({ "text": "first" }));
1102 state.push_egress(json!({ "text": "second" }));
1103 let result = state.finalize_with(Some(json!({ "text": "final" })));
1104 assert_eq!(
1105 result,
1106 json!([
1107 { "text": "first" },
1108 { "text": "second" },
1109 { "text": "final" }
1110 ])
1111 );
1112 }
1113
1114 #[test]
1115 fn finalize_flattens_final_array() {
1116 let mut state = ExecutionState::new(json!({}));
1117 state.push_egress(json!({ "text": "only" }));
1118 let result = state.finalize_with(Some(json!([
1119 { "text": "extra-1" },
1120 { "text": "extra-2" }
1121 ])));
1122 assert_eq!(
1123 result,
1124 json!([
1125 { "text": "only" },
1126 { "text": "extra-1" },
1127 { "text": "extra-2" }
1128 ])
1129 );
1130 }
1131
1132 #[test]
1133 fn missing_operation_reports_node_and_component() {
1134 let engine = minimal_engine();
1135 let rt = Runtime::new().unwrap();
1136 let retry_config = RetryConfig {
1137 max_attempts: 1,
1138 base_delay_ms: 1,
1139 };
1140 let ctx = FlowContext {
1141 tenant: "tenant",
1142 flow_id: "flow",
1143 node_id: Some("missing-op"),
1144 tool: None,
1145 action: None,
1146 session_id: None,
1147 provider_id: None,
1148 retry_config,
1149 observer: None,
1150 mocks: None,
1151 };
1152 let node = HostNode {
1153 kind: NodeKind::Exec {
1154 target_component: "qa.process".into(),
1155 },
1156 component: "component.exec".into(),
1157 component_id: "component.exec".into(),
1158 operation_name: None,
1159 operation_in_mapping: None,
1160 payload_expr: Value::Null,
1161 routing: Routing::End,
1162 };
1163 let state = ExecutionState::new(Value::Null);
1164 let payload = json!({ "component": "qa.process" });
1165 let err = rt
1166 .block_on(engine.execute_component_exec(
1167 &ctx,
1168 "missing-op",
1169 &node,
1170 &state,
1171 payload,
1172 ComponentOverrides {
1173 component: None,
1174 operation: None,
1175 },
1176 ))
1177 .unwrap_err();
1178 let message = err.to_string();
1179 assert!(
1180 message.contains("missing operation for node `missing-op`"),
1181 "unexpected message: {message}"
1182 );
1183 assert!(
1184 message.contains("(component `component.exec`)"),
1185 "unexpected message: {message}"
1186 );
1187 }
1188
1189 #[test]
1190 fn missing_operation_mentions_mapping_hint() {
1191 let engine = minimal_engine();
1192 let rt = Runtime::new().unwrap();
1193 let retry_config = RetryConfig {
1194 max_attempts: 1,
1195 base_delay_ms: 1,
1196 };
1197 let ctx = FlowContext {
1198 tenant: "tenant",
1199 flow_id: "flow",
1200 node_id: Some("missing-op-hint"),
1201 tool: None,
1202 action: None,
1203 session_id: None,
1204 provider_id: None,
1205 retry_config,
1206 observer: None,
1207 mocks: None,
1208 };
1209 let node = HostNode {
1210 kind: NodeKind::Exec {
1211 target_component: "qa.process".into(),
1212 },
1213 component: "component.exec".into(),
1214 component_id: "component.exec".into(),
1215 operation_name: None,
1216 operation_in_mapping: Some("render".into()),
1217 payload_expr: Value::Null,
1218 routing: Routing::End,
1219 };
1220 let state = ExecutionState::new(Value::Null);
1221 let payload = json!({ "component": "qa.process" });
1222 let err = rt
1223 .block_on(engine.execute_component_exec(
1224 &ctx,
1225 "missing-op-hint",
1226 &node,
1227 &state,
1228 payload,
1229 ComponentOverrides {
1230 component: None,
1231 operation: None,
1232 },
1233 ))
1234 .unwrap_err();
1235 let message = err.to_string();
1236 assert!(
1237 message.contains("missing operation for node `missing-op-hint`"),
1238 "unexpected message: {message}"
1239 );
1240 assert!(
1241 message.contains("Found operation in input.mapping (`render`)"),
1242 "unexpected message: {message}"
1243 );
1244 }
1245
1246 struct CountingObserver {
1247 starts: Mutex<Vec<String>>,
1248 ends: Mutex<Vec<Value>>,
1249 }
1250
1251 impl CountingObserver {
1252 fn new() -> Self {
1253 Self {
1254 starts: Mutex::new(Vec::new()),
1255 ends: Mutex::new(Vec::new()),
1256 }
1257 }
1258 }
1259
1260 impl ExecutionObserver for CountingObserver {
1261 fn on_node_start(&self, event: &NodeEvent<'_>) {
1262 self.starts.lock().unwrap().push(event.node_id.to_string());
1263 }
1264
1265 fn on_node_end(&self, _event: &NodeEvent<'_>, output: &Value) {
1266 self.ends.lock().unwrap().push(output.clone());
1267 }
1268
1269 fn on_node_error(&self, _event: &NodeEvent<'_>, _error: &dyn StdError) {}
1270 }
1271
1272 #[test]
1273 fn emits_end_event_for_successful_node() {
1274 let node_id = NodeId::from_str("emit").unwrap();
1275 let node = Node {
1276 id: node_id.clone(),
1277 component: FlowComponentRef {
1278 id: "emit.log".parse().unwrap(),
1279 pack_alias: None,
1280 operation: None,
1281 },
1282 input: InputMapping {
1283 mapping: json!({ "message": "logged" }),
1284 },
1285 output: OutputMapping {
1286 mapping: Value::Null,
1287 },
1288 routing: Routing::End,
1289 telemetry: TelemetryHints::default(),
1290 };
1291 let mut nodes = indexmap::IndexMap::default();
1292 nodes.insert(node_id.clone(), node);
1293 let flow = Flow {
1294 schema_version: "1.0".into(),
1295 id: FlowId::from_str("emit.flow").unwrap(),
1296 kind: FlowKind::Messaging,
1297 entrypoints: BTreeMap::from([(
1298 "default".to_string(),
1299 Value::String(node_id.to_string()),
1300 )]),
1301 nodes,
1302 metadata: Default::default(),
1303 };
1304 let host_flow = HostFlow::from(flow);
1305
1306 let engine = FlowEngine {
1307 packs: Vec::new(),
1308 flows: Vec::new(),
1309 flow_sources: HashMap::new(),
1310 flow_cache: RwLock::new(HashMap::from([("emit.flow".to_string(), host_flow)])),
1311 default_env: "local".to_string(),
1312 };
1313 let observer = CountingObserver::new();
1314 let ctx = FlowContext {
1315 tenant: "demo",
1316 flow_id: "emit.flow",
1317 node_id: None,
1318 tool: None,
1319 action: None,
1320 session_id: None,
1321 provider_id: None,
1322 retry_config: RetryConfig {
1323 max_attempts: 1,
1324 base_delay_ms: 1,
1325 },
1326 observer: Some(&observer),
1327 mocks: None,
1328 };
1329
1330 let rt = Runtime::new().unwrap();
1331 let result = rt.block_on(engine.execute(ctx, Value::Null)).unwrap();
1332 assert!(matches!(result.status, FlowStatus::Completed));
1333
1334 let starts = observer.starts.lock().unwrap();
1335 let ends = observer.ends.lock().unwrap();
1336 assert_eq!(starts.len(), 1);
1337 assert_eq!(ends.len(), 1);
1338 assert_eq!(ends[0], json!({ "message": "logged" }));
1339 }
1340}
1341
1342use tracing::Instrument;
1343
1344pub struct FlowContext<'a> {
1345 pub tenant: &'a str,
1346 pub flow_id: &'a str,
1347 pub node_id: Option<&'a str>,
1348 pub tool: Option<&'a str>,
1349 pub action: Option<&'a str>,
1350 pub session_id: Option<&'a str>,
1351 pub provider_id: Option<&'a str>,
1352 pub retry_config: RetryConfig,
1353 pub observer: Option<&'a dyn ExecutionObserver>,
1354 pub mocks: Option<&'a MockLayer>,
1355}
1356
1357#[derive(Copy, Clone)]
1358pub struct RetryConfig {
1359 pub max_attempts: u32,
1360 pub base_delay_ms: u64,
1361}
1362
1363fn should_retry(err: &anyhow::Error) -> bool {
1364 let lower = err.to_string().to_lowercase();
1365 lower.contains("transient") || lower.contains("unavailable") || lower.contains("internal")
1366}
1367
1368impl From<FlowRetryConfig> for RetryConfig {
1369 fn from(value: FlowRetryConfig) -> Self {
1370 Self {
1371 max_attempts: value.max_attempts.max(1),
1372 base_delay_ms: value.base_delay_ms.max(50),
1373 }
1374 }
1375}