1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
9use anyhow::{Context, Result, anyhow, bail};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use super::templating::{TemplateOptions, render_template_value};
18use crate::config::{FlowRetryConfig, HostConfig};
19use crate::pack::{FlowDescriptor, PackRuntime};
20use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
21use greentic_types::{Flow, Node, NodeId, Routing};
22
23pub struct FlowEngine {
24 packs: Vec<Arc<PackRuntime>>,
25 flows: Vec<FlowDescriptor>,
26 flow_sources: HashMap<String, usize>,
27 flow_cache: RwLock<HashMap<String, HostFlow>>,
28 default_env: String,
29}
30
31#[derive(Clone, Debug, Serialize, Deserialize)]
32pub struct FlowSnapshot {
33 pub flow_id: String,
34 pub next_node: String,
35 pub state: ExecutionState,
36}
37
38#[derive(Clone, Debug)]
39pub struct FlowWait {
40 pub reason: Option<String>,
41 pub snapshot: FlowSnapshot,
42}
43
44#[derive(Clone, Debug)]
45pub enum FlowStatus {
46 Completed,
47 Waiting(Box<FlowWait>),
48}
49
50#[derive(Clone, Debug)]
51pub struct FlowExecution {
52 pub output: Value,
53 pub status: FlowStatus,
54}
55
56#[derive(Clone, Debug)]
57struct HostFlow {
58 id: String,
59 start: Option<NodeId>,
60 nodes: IndexMap<NodeId, HostNode>,
61}
62
63#[derive(Clone, Debug)]
64pub struct HostNode {
65 kind: NodeKind,
66 pub component: String,
68 component_id: String,
69 operation_name: Option<String>,
70 operation_in_mapping: Option<String>,
71 payload_expr: Value,
72 routing: Routing,
73}
74
75#[derive(Clone, Debug)]
76enum NodeKind {
77 Exec { target_component: String },
78 PackComponent { component_ref: String },
79 ProviderInvoke,
80 FlowCall,
81 BuiltinEmit { kind: EmitKind },
82 Wait,
83}
84
85#[derive(Clone, Debug)]
86enum EmitKind {
87 Log,
88 Response,
89 Other(String),
90}
91
92struct ComponentOverrides<'a> {
93 component: Option<&'a str>,
94 operation: Option<&'a str>,
95}
96
97struct ComponentCall {
98 component_ref: String,
99 operation: String,
100 input: Value,
101 config: Value,
102}
103
104impl FlowExecution {
105 fn completed(output: Value) -> Self {
106 Self {
107 output,
108 status: FlowStatus::Completed,
109 }
110 }
111
112 fn waiting(output: Value, wait: FlowWait) -> Self {
113 Self {
114 output,
115 status: FlowStatus::Waiting(Box::new(wait)),
116 }
117 }
118}
119
120impl FlowEngine {
121 pub async fn new(packs: Vec<Arc<PackRuntime>>, _config: Arc<HostConfig>) -> Result<Self> {
122 let mut flow_sources = HashMap::new();
123 let mut descriptors = Vec::new();
124 for (idx, pack) in packs.iter().enumerate() {
125 let flows = pack.list_flows().await?;
126 for flow in flows {
127 tracing::info!(
128 flow_id = %flow.id,
129 flow_type = %flow.flow_type,
130 pack_index = idx,
131 "registered flow"
132 );
133 flow_sources.insert(flow.id.clone(), idx);
134 descriptors.retain(|existing: &FlowDescriptor| existing.id != flow.id);
135 descriptors.push(flow);
136 }
137 }
138
139 let mut flow_map = HashMap::new();
140 for flow in &descriptors {
141 if let Some(&pack_idx) = flow_sources.get(&flow.id) {
142 let pack_clone = Arc::clone(&packs[pack_idx]);
143 let flow_id = flow.id.clone();
144 let task_flow_id = flow_id.clone();
145 match task::spawn_blocking(move || pack_clone.load_flow(&task_flow_id)).await {
146 Ok(Ok(flow)) => {
147 flow_map.insert(flow_id, HostFlow::from(flow));
148 }
149 Ok(Err(err)) => {
150 tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
151 }
152 Err(err) => {
153 tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
154 }
155 }
156 }
157 }
158
159 Ok(Self {
160 packs,
161 flows: descriptors,
162 flow_sources,
163 flow_cache: RwLock::new(flow_map),
164 default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
165 })
166 }
167
168 async fn get_or_load_flow(&self, flow_id: &str) -> Result<HostFlow> {
169 if let Some(flow) = self.flow_cache.read().get(flow_id).cloned() {
170 return Ok(flow);
171 }
172
173 let pack_idx = *self
174 .flow_sources
175 .get(flow_id)
176 .with_context(|| format!("flow {flow_id} not registered"))?;
177 let pack = Arc::clone(&self.packs[pack_idx]);
178 let flow_id_owned = flow_id.to_string();
179 let task_flow_id = flow_id_owned.clone();
180 let flow = task::spawn_blocking(move || pack.load_flow(&task_flow_id))
181 .await
182 .context("failed to join flow metadata task")??;
183 let host_flow = HostFlow::from(flow);
184 self.flow_cache
185 .write()
186 .insert(flow_id_owned.clone(), host_flow.clone());
187 Ok(host_flow)
188 }
189
190 pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
191 let span = tracing::info_span!(
192 "flow.execute",
193 tenant = tracing::field::Empty,
194 flow_id = tracing::field::Empty,
195 node_id = tracing::field::Empty,
196 tool = tracing::field::Empty,
197 action = tracing::field::Empty
198 );
199 annotate_span(
200 &span,
201 &FlowSpanAttributes {
202 tenant: ctx.tenant,
203 flow_id: ctx.flow_id,
204 node_id: ctx.node_id,
205 tool: ctx.tool,
206 action: ctx.action,
207 },
208 );
209 set_flow_context(
210 &self.default_env,
211 ctx.tenant,
212 ctx.flow_id,
213 ctx.node_id,
214 ctx.provider_id,
215 ctx.session_id,
216 );
217 let retry_config = ctx.retry_config;
218 let original_input = input;
219 async move {
220 let mut attempt = 0u32;
221 loop {
222 attempt += 1;
223 match self.execute_once(&ctx, original_input.clone()).await {
224 Ok(value) => return Ok(value),
225 Err(err) => {
226 if attempt >= retry_config.max_attempts || !should_retry(&err) {
227 return Err(err);
228 }
229 let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
230 tracing::warn!(
231 tenant = ctx.tenant,
232 flow_id = ctx.flow_id,
233 attempt,
234 max_attempts = retry_config.max_attempts,
235 delay_ms = delay,
236 error = %err,
237 "transient flow execution failure, backing off"
238 );
239 tokio::time::sleep(Duration::from_millis(delay)).await;
240 }
241 }
242 }
243 }
244 .instrument(span)
245 .await
246 }
247
248 pub async fn resume(
249 &self,
250 ctx: FlowContext<'_>,
251 snapshot: FlowSnapshot,
252 input: Value,
253 ) -> Result<FlowExecution> {
254 if snapshot.flow_id != ctx.flow_id {
255 bail!(
256 "snapshot flow {} does not match requested {}",
257 snapshot.flow_id,
258 ctx.flow_id
259 );
260 }
261 let flow_ir = self.get_or_load_flow(ctx.flow_id).await?;
262 let mut state = snapshot.state;
263 state.replace_input(input);
264 state.ensure_entry();
265 self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
266 .await
267 }
268
269 async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
270 let flow_ir = self.get_or_load_flow(ctx.flow_id).await?;
271 let state = ExecutionState::new(input);
272 self.drive_flow(ctx, flow_ir, state, None).await
273 }
274
275 async fn drive_flow(
276 &self,
277 ctx: &FlowContext<'_>,
278 flow_ir: HostFlow,
279 mut state: ExecutionState,
280 resume_from: Option<String>,
281 ) -> Result<FlowExecution> {
282 let mut current = match resume_from {
283 Some(node) => NodeId::from_str(&node)
284 .with_context(|| format!("invalid resume node id `{node}`"))?,
285 None => flow_ir
286 .start
287 .clone()
288 .or_else(|| flow_ir.nodes.keys().next().cloned())
289 .with_context(|| format!("flow {} has no start node", flow_ir.id))?,
290 };
291
292 loop {
293 let node = flow_ir
294 .nodes
295 .get(¤t)
296 .with_context(|| format!("node {} not found", current.as_str()))?;
297
298 let payload_template = node.payload_expr.clone();
299 let prev = state
300 .last_output
301 .as_ref()
302 .cloned()
303 .unwrap_or_else(|| Value::Object(JsonMap::new()));
304 let ctx_value = template_context(&state, prev);
305 let payload =
306 render_template_value(&payload_template, &ctx_value, TemplateOptions::default())
307 .context("failed to render node input template")?;
308 let observed_payload = payload.clone();
309 let node_id = current.clone();
310 let event = NodeEvent {
311 context: ctx,
312 node_id: node_id.as_str(),
313 node,
314 payload: &observed_payload,
315 };
316 if let Some(observer) = ctx.observer {
317 observer.on_node_start(&event);
318 }
319 let DispatchOutcome {
320 output,
321 wait_reason,
322 } = self
323 .dispatch_node(ctx, node_id.as_str(), node, &mut state, payload)
324 .await?;
325
326 state.nodes.insert(node_id.clone().into(), output.clone());
327 state.last_output = Some(output.payload.clone());
328 if let Some(observer) = ctx.observer {
329 observer.on_node_end(&event, &output.payload);
330 }
331
332 let (next, should_exit) = match &node.routing {
333 Routing::Next { node_id } => (Some(node_id.clone()), false),
334 Routing::End | Routing::Reply => (None, true),
335 Routing::Branch { default, .. } => (default.clone(), default.is_none()),
336 Routing::Custom(raw) => {
337 tracing::warn!(
338 flow_id = %flow_ir.id,
339 node_id = %node_id,
340 routing = ?raw,
341 "unsupported routing; terminating flow"
342 );
343 (None, true)
344 }
345 };
346
347 if let Some(wait_reason) = wait_reason {
348 let resume_target = next.clone().ok_or_else(|| {
349 anyhow!(
350 "session.wait node {} requires a non-empty route",
351 current.as_str()
352 )
353 })?;
354 let mut snapshot_state = state.clone();
355 snapshot_state.clear_egress();
356 let snapshot = FlowSnapshot {
357 flow_id: ctx.flow_id.to_string(),
358 next_node: resume_target.as_str().to_string(),
359 state: snapshot_state,
360 };
361 let output_value = state.clone().finalize_with(None);
362 return Ok(FlowExecution::waiting(
363 output_value,
364 FlowWait {
365 reason: Some(wait_reason),
366 snapshot,
367 },
368 ));
369 }
370
371 if should_exit {
372 return Ok(FlowExecution::completed(
373 state.finalize_with(Some(output.payload.clone())),
374 ));
375 }
376
377 match next {
378 Some(n) => current = n,
379 None => {
380 return Ok(FlowExecution::completed(
381 state.finalize_with(Some(output.payload.clone())),
382 ));
383 }
384 }
385 }
386 }
387
388 async fn dispatch_node(
389 &self,
390 ctx: &FlowContext<'_>,
391 node_id: &str,
392 node: &HostNode,
393 state: &mut ExecutionState,
394 payload: Value,
395 ) -> Result<DispatchOutcome> {
396 match &node.kind {
397 NodeKind::Exec { target_component } => self
398 .execute_component_exec(
399 ctx,
400 node_id,
401 node,
402 payload,
403 ComponentOverrides {
404 component: Some(target_component.as_str()),
405 operation: node.operation_name.as_deref(),
406 },
407 )
408 .await
409 .map(DispatchOutcome::complete),
410 NodeKind::PackComponent { component_ref } => self
411 .execute_component_call(ctx, node_id, node, payload, component_ref.as_str())
412 .await
413 .map(DispatchOutcome::complete),
414 NodeKind::FlowCall => self
415 .execute_flow_call(ctx, payload)
416 .await
417 .map(DispatchOutcome::complete),
418 NodeKind::ProviderInvoke => self
419 .execute_provider_invoke(ctx, node_id, state, payload)
420 .await
421 .map(DispatchOutcome::complete),
422 NodeKind::BuiltinEmit { kind } => {
423 match kind {
424 EmitKind::Log | EmitKind::Response => {}
425 EmitKind::Other(component) => {
426 tracing::debug!(%component, "handling emit.* as builtin");
427 }
428 }
429 state.push_egress(payload.clone());
430 Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
431 }
432 NodeKind::Wait => {
433 let reason = extract_wait_reason(&payload);
434 Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
435 }
436 }
437 }
438
439 async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
440 #[derive(Deserialize)]
441 struct FlowCallPayload {
442 #[serde(alias = "flow")]
443 flow_id: String,
444 #[serde(default)]
445 input: Value,
446 }
447
448 let call: FlowCallPayload =
449 serde_json::from_value(payload).context("invalid payload for flow.call node")?;
450 if call.flow_id.trim().is_empty() {
451 bail!("flow.call requires a non-empty flow_id");
452 }
453
454 let sub_input = if call.input.is_null() {
455 Value::Null
456 } else {
457 call.input
458 };
459
460 let flow_id_owned = call.flow_id;
461 let action = "flow.call";
462 let sub_ctx = FlowContext {
463 tenant: ctx.tenant,
464 flow_id: flow_id_owned.as_str(),
465 node_id: None,
466 tool: ctx.tool,
467 action: Some(action),
468 session_id: ctx.session_id,
469 provider_id: ctx.provider_id,
470 retry_config: ctx.retry_config,
471 observer: ctx.observer,
472 mocks: ctx.mocks,
473 };
474
475 let execution = Box::pin(self.execute(sub_ctx, sub_input))
476 .await
477 .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
478 match execution.status {
479 FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
480 FlowStatus::Waiting(wait) => bail!(
481 "flow.call cannot pause (flow {} waiting {:?})",
482 flow_id_owned,
483 wait.reason
484 ),
485 }
486 }
487
488 async fn execute_component_exec(
489 &self,
490 ctx: &FlowContext<'_>,
491 node_id: &str,
492 node: &HostNode,
493 payload: Value,
494 overrides: ComponentOverrides<'_>,
495 ) -> Result<NodeOutput> {
496 #[derive(Deserialize)]
497 struct ComponentPayload {
498 #[serde(default, alias = "component_ref", alias = "component")]
499 component: Option<String>,
500 #[serde(alias = "op")]
501 operation: Option<String>,
502 #[serde(default)]
503 input: Value,
504 #[serde(default)]
505 config: Value,
506 }
507
508 let payload: ComponentPayload =
509 serde_json::from_value(payload).context("invalid payload for component.exec")?;
510 let component_ref = overrides
511 .component
512 .map(str::to_string)
513 .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
514 .with_context(|| "component.exec requires a component_ref")?;
515 let operation = resolve_component_operation(
516 node_id,
517 node.component_id.as_str(),
518 payload.operation,
519 overrides.operation,
520 node.operation_in_mapping.as_deref(),
521 )?;
522 let call = ComponentCall {
523 component_ref,
524 operation,
525 input: payload.input,
526 config: payload.config,
527 };
528
529 self.invoke_component_call(ctx, node_id, call).await
530 }
531
532 async fn execute_component_call(
533 &self,
534 ctx: &FlowContext<'_>,
535 node_id: &str,
536 node: &HostNode,
537 payload: Value,
538 component_ref: &str,
539 ) -> Result<NodeOutput> {
540 let payload_operation = extract_operation_from_mapping(&payload);
541 let (input, config) = split_operation_payload(payload);
542 let operation = resolve_component_operation(
543 node_id,
544 node.component_id.as_str(),
545 payload_operation,
546 node.operation_name.as_deref(),
547 node.operation_in_mapping.as_deref(),
548 )?;
549 let call = ComponentCall {
550 component_ref: component_ref.to_string(),
551 operation,
552 input,
553 config,
554 };
555 self.invoke_component_call(ctx, node_id, call).await
556 }
557
558 async fn invoke_component_call(
559 &self,
560 ctx: &FlowContext<'_>,
561 node_id: &str,
562 call: ComponentCall,
563 ) -> Result<NodeOutput> {
564 let input_json = serde_json::to_string(&call.input)?;
565 let config_json = if call.config.is_null() {
566 None
567 } else {
568 Some(serde_json::to_string(&call.config)?)
569 };
570
571 let pack_idx = *self
572 .flow_sources
573 .get(ctx.flow_id)
574 .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
575 let pack = Arc::clone(&self.packs[pack_idx]);
576 let exec_ctx = component_exec_ctx(ctx, node_id);
577 let value = pack
578 .invoke_component(
579 call.component_ref.as_str(),
580 exec_ctx,
581 call.operation.as_str(),
582 config_json,
583 input_json,
584 )
585 .await?;
586
587 Ok(NodeOutput::new(value))
588 }
589
590 async fn execute_provider_invoke(
591 &self,
592 ctx: &FlowContext<'_>,
593 node_id: &str,
594 state: &ExecutionState,
595 payload: Value,
596 ) -> Result<NodeOutput> {
597 #[derive(Deserialize)]
598 struct ProviderPayload {
599 #[serde(default)]
600 provider_id: Option<String>,
601 #[serde(default)]
602 provider_type: Option<String>,
603 #[serde(default, alias = "operation")]
604 op: Option<String>,
605 #[serde(default)]
606 input: Value,
607 #[serde(default)]
608 in_map: Value,
609 #[serde(default)]
610 out_map: Value,
611 #[serde(default)]
612 err_map: Value,
613 }
614
615 let payload: ProviderPayload =
616 serde_json::from_value(payload).context("invalid payload for provider.invoke")?;
617 let op = payload
618 .op
619 .as_deref()
620 .filter(|v| !v.trim().is_empty())
621 .with_context(|| "provider.invoke requires an op")?
622 .to_string();
623
624 let prev = state
625 .last_output
626 .as_ref()
627 .cloned()
628 .unwrap_or_else(|| Value::Object(JsonMap::new()));
629 let base_ctx = template_context(state, prev);
630
631 let input_value = if !payload.in_map.is_null() {
632 let mut ctx_value = base_ctx.clone();
633 if let Value::Object(ref mut map) = ctx_value {
634 map.insert("input".into(), payload.input.clone());
635 map.insert("result".into(), payload.input.clone());
636 }
637 render_template_value(
638 &payload.in_map,
639 &ctx_value,
640 TemplateOptions {
641 allow_pointer: true,
642 },
643 )
644 .context("failed to render provider.invoke in_map")?
645 } else if !payload.input.is_null() {
646 payload.input
647 } else {
648 Value::Null
649 };
650 let input_json = serde_json::to_vec(&input_value)?;
651
652 let pack_idx = *self
653 .flow_sources
654 .get(ctx.flow_id)
655 .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
656 let pack = Arc::clone(&self.packs[pack_idx]);
657 let binding = pack.resolve_provider(
658 payload.provider_id.as_deref(),
659 payload.provider_type.as_deref(),
660 )?;
661 let exec_ctx = component_exec_ctx(ctx, node_id);
662 let result = pack
663 .invoke_provider(&binding, exec_ctx, &op, input_json)
664 .await?;
665
666 let output = if payload.out_map.is_null() {
667 result
668 } else {
669 let mut ctx_value = base_ctx;
670 if let Value::Object(ref mut map) = ctx_value {
671 map.insert("input".into(), result.clone());
672 map.insert("result".into(), result.clone());
673 }
674 render_template_value(
675 &payload.out_map,
676 &ctx_value,
677 TemplateOptions {
678 allow_pointer: true,
679 },
680 )
681 .context("failed to render provider.invoke out_map")?
682 };
683 let _ = payload.err_map;
684 Ok(NodeOutput::new(output))
685 }
686
687 pub fn flows(&self) -> &[FlowDescriptor] {
688 &self.flows
689 }
690
691 pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
692 self.flows
693 .iter()
694 .find(|descriptor| descriptor.flow_type == flow_type)
695 }
696
697 pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
698 self.flows
699 .iter()
700 .find(|descriptor| descriptor.id == flow_id)
701 }
702}
703
704pub trait ExecutionObserver: Send + Sync {
705 fn on_node_start(&self, event: &NodeEvent<'_>);
706 fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
707 fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
708}
709
710pub struct NodeEvent<'a> {
711 pub context: &'a FlowContext<'a>,
712 pub node_id: &'a str,
713 pub node: &'a HostNode,
714 pub payload: &'a Value,
715}
716
717#[derive(Clone, Debug, Serialize, Deserialize)]
718pub struct ExecutionState {
719 #[serde(default)]
720 entry: Value,
721 #[serde(default)]
722 input: Value,
723 #[serde(default)]
724 nodes: HashMap<String, NodeOutput>,
725 #[serde(default)]
726 egress: Vec<Value>,
727 #[serde(default, skip_serializing_if = "Option::is_none")]
728 last_output: Option<Value>,
729}
730
731impl ExecutionState {
732 fn new(input: Value) -> Self {
733 Self {
734 entry: input.clone(),
735 input,
736 nodes: HashMap::new(),
737 egress: Vec::new(),
738 last_output: None,
739 }
740 }
741
742 fn ensure_entry(&mut self) {
743 if self.entry.is_null() {
744 self.entry = self.input.clone();
745 }
746 }
747
748 fn context(&self) -> Value {
749 let mut nodes = JsonMap::new();
750 for (id, output) in &self.nodes {
751 nodes.insert(
752 id.clone(),
753 json!({
754 "ok": output.ok,
755 "payload": output.payload.clone(),
756 "meta": output.meta.clone(),
757 }),
758 );
759 }
760 json!({
761 "entry": self.entry.clone(),
762 "input": self.input.clone(),
763 "nodes": nodes,
764 })
765 }
766
767 fn outputs_map(&self) -> JsonMap<String, Value> {
768 let mut outputs = JsonMap::new();
769 for (id, output) in &self.nodes {
770 outputs.insert(id.clone(), output.payload.clone());
771 }
772 outputs
773 }
774 fn push_egress(&mut self, payload: Value) {
775 self.egress.push(payload);
776 }
777
778 fn replace_input(&mut self, input: Value) {
779 self.input = input;
780 }
781
782 fn clear_egress(&mut self) {
783 self.egress.clear();
784 }
785
786 fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
787 if self.egress.is_empty() {
788 return final_payload.unwrap_or(Value::Null);
789 }
790 let mut emitted = std::mem::take(&mut self.egress);
791 if let Some(value) = final_payload {
792 match value {
793 Value::Null => {}
794 Value::Array(items) => emitted.extend(items),
795 other => emitted.push(other),
796 }
797 }
798 Value::Array(emitted)
799 }
800}
801
802#[derive(Clone, Debug, Serialize, Deserialize)]
803struct NodeOutput {
804 ok: bool,
805 payload: Value,
806 meta: Value,
807}
808
809impl NodeOutput {
810 fn new(payload: Value) -> Self {
811 Self {
812 ok: true,
813 payload,
814 meta: Value::Null,
815 }
816 }
817}
818
819struct DispatchOutcome {
820 output: NodeOutput,
821 wait_reason: Option<String>,
822}
823
824impl DispatchOutcome {
825 fn complete(output: NodeOutput) -> Self {
826 Self {
827 output,
828 wait_reason: None,
829 }
830 }
831
832 fn wait(output: NodeOutput, reason: Option<String>) -> Self {
833 Self {
834 output,
835 wait_reason: reason,
836 }
837 }
838}
839
840fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
841 ComponentExecCtx {
842 tenant: ComponentTenantCtx {
843 tenant: ctx.tenant.to_string(),
844 team: None,
845 user: ctx.provider_id.map(str::to_string),
846 trace_id: None,
847 correlation_id: ctx.session_id.map(str::to_string),
848 deadline_unix_ms: None,
849 attempt: 1,
850 idempotency_key: ctx.session_id.map(str::to_string),
851 },
852 flow_id: ctx.flow_id.to_string(),
853 node_id: Some(node_id.to_string()),
854 }
855}
856
857fn extract_wait_reason(payload: &Value) -> Option<String> {
858 match payload {
859 Value::String(s) => Some(s.clone()),
860 Value::Object(map) => map
861 .get("reason")
862 .and_then(Value::as_str)
863 .map(|value| value.to_string()),
864 _ => None,
865 }
866}
867
868fn template_context(state: &ExecutionState, prev: Value) -> Value {
869 let entry = if state.entry.is_null() {
870 Value::Object(JsonMap::new())
871 } else {
872 state.entry.clone()
873 };
874 let mut ctx = JsonMap::new();
875 ctx.insert("entry".into(), entry);
876 ctx.insert("prev".into(), prev);
877 ctx.insert("node".into(), Value::Object(state.outputs_map()));
878 ctx.insert("state".into(), state.context());
879 Value::Object(ctx)
880}
881
882impl From<Flow> for HostFlow {
883 fn from(value: Flow) -> Self {
884 let mut nodes = IndexMap::new();
885 for (id, node) in value.nodes {
886 nodes.insert(id.clone(), HostNode::from(node));
887 }
888 let start = value
889 .entrypoints
890 .get("default")
891 .and_then(Value::as_str)
892 .and_then(|id| NodeId::from_str(id).ok())
893 .or_else(|| nodes.keys().next().cloned());
894 Self {
895 id: value.id.as_str().to_string(),
896 start,
897 nodes,
898 }
899 }
900}
901
902impl From<Node> for HostNode {
903 fn from(node: Node) -> Self {
904 let component_ref = node.component.id.as_str().to_string();
905 let raw_operation = node.component.operation.clone();
906 let operation_in_mapping = extract_operation_from_mapping(&node.input.mapping);
907 let operation_is_component_exec = raw_operation.as_deref() == Some("component.exec");
908 let operation_is_emit = raw_operation
909 .as_deref()
910 .map(|op| op.starts_with("emit."))
911 .unwrap_or(false);
912 let is_component_exec = component_ref == "component.exec" || operation_is_component_exec;
913
914 let kind = if is_component_exec {
915 let target = if component_ref == "component.exec" {
916 if let Some(op) = raw_operation
917 .as_deref()
918 .filter(|op| op.starts_with("emit."))
919 {
920 op.to_string()
921 } else {
922 extract_target_component(&node.input.mapping)
923 .unwrap_or_else(|| "component.exec".to_string())
924 }
925 } else {
926 extract_target_component(&node.input.mapping)
927 .unwrap_or_else(|| component_ref.clone())
928 };
929 if target.starts_with("emit.") {
930 NodeKind::BuiltinEmit {
931 kind: emit_kind_from_ref(&target),
932 }
933 } else {
934 NodeKind::Exec {
935 target_component: target,
936 }
937 }
938 } else if operation_is_emit {
939 NodeKind::BuiltinEmit {
940 kind: emit_kind_from_ref(raw_operation.as_deref().unwrap_or("emit.log")),
941 }
942 } else {
943 match component_ref.as_str() {
944 "flow.call" => NodeKind::FlowCall,
945 "provider.invoke" => NodeKind::ProviderInvoke,
946 "session.wait" => NodeKind::Wait,
947 comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
948 kind: emit_kind_from_ref(comp),
949 },
950 other => NodeKind::PackComponent {
951 component_ref: other.to_string(),
952 },
953 }
954 };
955 let component_label = match &kind {
956 NodeKind::Exec { .. } => "component.exec".to_string(),
957 NodeKind::PackComponent { component_ref } => component_ref.clone(),
958 NodeKind::ProviderInvoke => "provider.invoke".to_string(),
959 NodeKind::FlowCall => "flow.call".to_string(),
960 NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
961 NodeKind::Wait => "session.wait".to_string(),
962 };
963 let operation_name = if is_component_exec && operation_is_component_exec {
964 None
965 } else {
966 raw_operation.clone()
967 };
968 let payload_expr = match kind {
969 NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.input.mapping),
970 _ => node.input.mapping.clone(),
971 };
972 Self {
973 kind,
974 component: component_label,
975 component_id: if is_component_exec {
976 "component.exec".to_string()
977 } else {
978 component_ref
979 },
980 operation_name,
981 operation_in_mapping,
982 payload_expr,
983 routing: node.routing,
984 }
985 }
986}
987
988fn extract_target_component(payload: &Value) -> Option<String> {
989 match payload {
990 Value::Object(map) => map
991 .get("component")
992 .or_else(|| map.get("component_ref"))
993 .and_then(Value::as_str)
994 .map(|s| s.to_string()),
995 _ => None,
996 }
997}
998
999fn extract_operation_from_mapping(payload: &Value) -> Option<String> {
1000 match payload {
1001 Value::Object(map) => map
1002 .get("operation")
1003 .or_else(|| map.get("op"))
1004 .and_then(Value::as_str)
1005 .map(str::trim)
1006 .filter(|value| !value.is_empty())
1007 .map(|value| value.to_string()),
1008 _ => None,
1009 }
1010}
1011
1012fn extract_emit_payload(payload: &Value) -> Value {
1013 if let Value::Object(map) = payload {
1014 if let Some(input) = map.get("input") {
1015 return input.clone();
1016 }
1017 if let Some(inner) = map.get("payload") {
1018 return inner.clone();
1019 }
1020 }
1021 payload.clone()
1022}
1023
1024fn split_operation_payload(payload: Value) -> (Value, Value) {
1025 if let Value::Object(mut map) = payload.clone()
1026 && map.contains_key("input")
1027 {
1028 let input = map.remove("input").unwrap_or(Value::Null);
1029 let config = map.remove("config").unwrap_or(Value::Null);
1030 let legacy_only = map.keys().all(|key| {
1031 matches!(
1032 key.as_str(),
1033 "operation" | "op" | "component" | "component_ref"
1034 )
1035 });
1036 if legacy_only {
1037 return (input, config);
1038 }
1039 }
1040 (payload, Value::Null)
1041}
1042
1043fn resolve_component_operation(
1044 node_id: &str,
1045 component_label: &str,
1046 payload_operation: Option<String>,
1047 operation_override: Option<&str>,
1048 operation_in_mapping: Option<&str>,
1049) -> Result<String> {
1050 if let Some(op) = operation_override
1051 .map(str::trim)
1052 .filter(|value| !value.is_empty())
1053 {
1054 return Ok(op.to_string());
1055 }
1056
1057 if let Some(op) = payload_operation
1058 .as_deref()
1059 .map(str::trim)
1060 .filter(|value| !value.is_empty())
1061 {
1062 return Ok(op.to_string());
1063 }
1064
1065 let mut message = format!(
1066 "missing operation for node `{}` (component `{}`); expected node.component.operation to be set",
1067 node_id, component_label,
1068 );
1069 if let Some(found) = operation_in_mapping {
1070 message.push_str(&format!(
1071 ". Found operation in input.mapping (`{}`) but this is not used; pack compiler must preserve node.component.operation.",
1072 found
1073 ));
1074 }
1075 bail!(message);
1076}
1077
1078fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
1079 match component_ref {
1080 "emit.log" => EmitKind::Log,
1081 "emit.response" => EmitKind::Response,
1082 other => EmitKind::Other(other.to_string()),
1083 }
1084}
1085
1086fn emit_ref_from_kind(kind: &EmitKind) -> String {
1087 match kind {
1088 EmitKind::Log => "emit.log".to_string(),
1089 EmitKind::Response => "emit.response".to_string(),
1090 EmitKind::Other(other) => other.clone(),
1091 }
1092}
1093
1094#[cfg(test)]
1095mod tests {
1096 use super::*;
1097 use greentic_types::{
1098 Flow, FlowComponentRef, FlowId, FlowKind, InputMapping, Node, NodeId, OutputMapping,
1099 Routing, TelemetryHints,
1100 };
1101 use serde_json::json;
1102 use std::collections::BTreeMap;
1103 use std::str::FromStr;
1104 use std::sync::Mutex;
1105 use tokio::runtime::Runtime;
1106
1107 fn minimal_engine() -> FlowEngine {
1108 FlowEngine {
1109 packs: Vec::new(),
1110 flows: Vec::new(),
1111 flow_sources: HashMap::new(),
1112 flow_cache: RwLock::new(HashMap::new()),
1113 default_env: "local".to_string(),
1114 }
1115 }
1116
1117 #[test]
1118 fn templating_renders_with_partials_and_data() {
1119 let mut state = ExecutionState::new(json!({ "city": "London" }));
1120 state.nodes.insert(
1121 "forecast".to_string(),
1122 NodeOutput::new(json!({ "temp": "20C" })),
1123 );
1124
1125 let ctx = state.context();
1127 assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
1128 }
1129
1130 #[test]
1131 fn finalize_wraps_emitted_payloads() {
1132 let mut state = ExecutionState::new(json!({}));
1133 state.push_egress(json!({ "text": "first" }));
1134 state.push_egress(json!({ "text": "second" }));
1135 let result = state.finalize_with(Some(json!({ "text": "final" })));
1136 assert_eq!(
1137 result,
1138 json!([
1139 { "text": "first" },
1140 { "text": "second" },
1141 { "text": "final" }
1142 ])
1143 );
1144 }
1145
1146 #[test]
1147 fn finalize_flattens_final_array() {
1148 let mut state = ExecutionState::new(json!({}));
1149 state.push_egress(json!({ "text": "only" }));
1150 let result = state.finalize_with(Some(json!([
1151 { "text": "extra-1" },
1152 { "text": "extra-2" }
1153 ])));
1154 assert_eq!(
1155 result,
1156 json!([
1157 { "text": "only" },
1158 { "text": "extra-1" },
1159 { "text": "extra-2" }
1160 ])
1161 );
1162 }
1163
1164 #[test]
1165 fn missing_operation_reports_node_and_component() {
1166 let engine = minimal_engine();
1167 let rt = Runtime::new().unwrap();
1168 let retry_config = RetryConfig {
1169 max_attempts: 1,
1170 base_delay_ms: 1,
1171 };
1172 let ctx = FlowContext {
1173 tenant: "tenant",
1174 flow_id: "flow",
1175 node_id: Some("missing-op"),
1176 tool: None,
1177 action: None,
1178 session_id: None,
1179 provider_id: None,
1180 retry_config,
1181 observer: None,
1182 mocks: None,
1183 };
1184 let node = HostNode {
1185 kind: NodeKind::Exec {
1186 target_component: "qa.process".into(),
1187 },
1188 component: "component.exec".into(),
1189 component_id: "component.exec".into(),
1190 operation_name: None,
1191 operation_in_mapping: None,
1192 payload_expr: Value::Null,
1193 routing: Routing::End,
1194 };
1195 let _state = ExecutionState::new(Value::Null);
1196 let payload = json!({ "component": "qa.process" });
1197 let err = rt
1198 .block_on(engine.execute_component_exec(
1199 &ctx,
1200 "missing-op",
1201 &node,
1202 payload,
1203 ComponentOverrides {
1204 component: None,
1205 operation: None,
1206 },
1207 ))
1208 .unwrap_err();
1209 let message = err.to_string();
1210 assert!(
1211 message.contains("missing operation for node `missing-op`"),
1212 "unexpected message: {message}"
1213 );
1214 assert!(
1215 message.contains("(component `component.exec`)"),
1216 "unexpected message: {message}"
1217 );
1218 }
1219
1220 #[test]
1221 fn missing_operation_mentions_mapping_hint() {
1222 let engine = minimal_engine();
1223 let rt = Runtime::new().unwrap();
1224 let retry_config = RetryConfig {
1225 max_attempts: 1,
1226 base_delay_ms: 1,
1227 };
1228 let ctx = FlowContext {
1229 tenant: "tenant",
1230 flow_id: "flow",
1231 node_id: Some("missing-op-hint"),
1232 tool: None,
1233 action: None,
1234 session_id: None,
1235 provider_id: None,
1236 retry_config,
1237 observer: None,
1238 mocks: None,
1239 };
1240 let node = HostNode {
1241 kind: NodeKind::Exec {
1242 target_component: "qa.process".into(),
1243 },
1244 component: "component.exec".into(),
1245 component_id: "component.exec".into(),
1246 operation_name: None,
1247 operation_in_mapping: Some("render".into()),
1248 payload_expr: Value::Null,
1249 routing: Routing::End,
1250 };
1251 let _state = ExecutionState::new(Value::Null);
1252 let payload = json!({ "component": "qa.process" });
1253 let err = rt
1254 .block_on(engine.execute_component_exec(
1255 &ctx,
1256 "missing-op-hint",
1257 &node,
1258 payload,
1259 ComponentOverrides {
1260 component: None,
1261 operation: None,
1262 },
1263 ))
1264 .unwrap_err();
1265 let message = err.to_string();
1266 assert!(
1267 message.contains("missing operation for node `missing-op-hint`"),
1268 "unexpected message: {message}"
1269 );
1270 assert!(
1271 message.contains("Found operation in input.mapping (`render`)"),
1272 "unexpected message: {message}"
1273 );
1274 }
1275
1276 struct CountingObserver {
1277 starts: Mutex<Vec<String>>,
1278 ends: Mutex<Vec<Value>>,
1279 }
1280
1281 impl CountingObserver {
1282 fn new() -> Self {
1283 Self {
1284 starts: Mutex::new(Vec::new()),
1285 ends: Mutex::new(Vec::new()),
1286 }
1287 }
1288 }
1289
1290 impl ExecutionObserver for CountingObserver {
1291 fn on_node_start(&self, event: &NodeEvent<'_>) {
1292 self.starts.lock().unwrap().push(event.node_id.to_string());
1293 }
1294
1295 fn on_node_end(&self, _event: &NodeEvent<'_>, output: &Value) {
1296 self.ends.lock().unwrap().push(output.clone());
1297 }
1298
1299 fn on_node_error(&self, _event: &NodeEvent<'_>, _error: &dyn StdError) {}
1300 }
1301
1302 #[test]
1303 fn emits_end_event_for_successful_node() {
1304 let node_id = NodeId::from_str("emit").unwrap();
1305 let node = Node {
1306 id: node_id.clone(),
1307 component: FlowComponentRef {
1308 id: "emit.log".parse().unwrap(),
1309 pack_alias: None,
1310 operation: None,
1311 },
1312 input: InputMapping {
1313 mapping: json!({ "message": "logged" }),
1314 },
1315 output: OutputMapping {
1316 mapping: Value::Null,
1317 },
1318 routing: Routing::End,
1319 telemetry: TelemetryHints::default(),
1320 };
1321 let mut nodes = indexmap::IndexMap::default();
1322 nodes.insert(node_id.clone(), node);
1323 let flow = Flow {
1324 schema_version: "1.0".into(),
1325 id: FlowId::from_str("emit.flow").unwrap(),
1326 kind: FlowKind::Messaging,
1327 entrypoints: BTreeMap::from([(
1328 "default".to_string(),
1329 Value::String(node_id.to_string()),
1330 )]),
1331 nodes,
1332 metadata: Default::default(),
1333 };
1334 let host_flow = HostFlow::from(flow);
1335
1336 let engine = FlowEngine {
1337 packs: Vec::new(),
1338 flows: Vec::new(),
1339 flow_sources: HashMap::new(),
1340 flow_cache: RwLock::new(HashMap::from([("emit.flow".to_string(), host_flow)])),
1341 default_env: "local".to_string(),
1342 };
1343 let observer = CountingObserver::new();
1344 let ctx = FlowContext {
1345 tenant: "demo",
1346 flow_id: "emit.flow",
1347 node_id: None,
1348 tool: None,
1349 action: None,
1350 session_id: None,
1351 provider_id: None,
1352 retry_config: RetryConfig {
1353 max_attempts: 1,
1354 base_delay_ms: 1,
1355 },
1356 observer: Some(&observer),
1357 mocks: None,
1358 };
1359
1360 let rt = Runtime::new().unwrap();
1361 let result = rt.block_on(engine.execute(ctx, Value::Null)).unwrap();
1362 assert!(matches!(result.status, FlowStatus::Completed));
1363
1364 let starts = observer.starts.lock().unwrap();
1365 let ends = observer.ends.lock().unwrap();
1366 assert_eq!(starts.len(), 1);
1367 assert_eq!(ends.len(), 1);
1368 assert_eq!(ends[0], json!({ "message": "logged" }));
1369 }
1370}
1371
1372use tracing::Instrument;
1373
1374pub struct FlowContext<'a> {
1375 pub tenant: &'a str,
1376 pub flow_id: &'a str,
1377 pub node_id: Option<&'a str>,
1378 pub tool: Option<&'a str>,
1379 pub action: Option<&'a str>,
1380 pub session_id: Option<&'a str>,
1381 pub provider_id: Option<&'a str>,
1382 pub retry_config: RetryConfig,
1383 pub observer: Option<&'a dyn ExecutionObserver>,
1384 pub mocks: Option<&'a MockLayer>,
1385}
1386
1387#[derive(Copy, Clone)]
1388pub struct RetryConfig {
1389 pub max_attempts: u32,
1390 pub base_delay_ms: u64,
1391}
1392
1393fn should_retry(err: &anyhow::Error) -> bool {
1394 let lower = err.to_string().to_lowercase();
1395 lower.contains("transient") || lower.contains("unavailable") || lower.contains("internal")
1396}
1397
1398impl From<FlowRetryConfig> for RetryConfig {
1399 fn from(value: FlowRetryConfig) -> Self {
1400 Self {
1401 max_attempts: value.max_attempts.max(1),
1402 base_delay_ms: value.base_delay_ms.max(50),
1403 }
1404 }
1405}