1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::str::FromStr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
9use anyhow::{Context, Result, anyhow, bail};
10use indexmap::IndexMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use serde_json::{Map as JsonMap, Value, json};
14use tokio::task;
15
16use super::mocks::MockLayer;
17use super::templating::{TemplateOptions, render_template_value};
18use crate::config::{FlowRetryConfig, HostConfig};
19use crate::pack::{FlowDescriptor, PackRuntime};
20use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
21use greentic_types::{Flow, Node, NodeId, Routing};
22
23pub struct FlowEngine {
24 packs: Vec<Arc<PackRuntime>>,
25 flows: Vec<FlowDescriptor>,
26 flow_sources: HashMap<FlowKey, usize>,
27 flow_cache: RwLock<HashMap<FlowKey, HostFlow>>,
28 default_env: String,
29}
30
31#[derive(Clone, Debug, PartialEq, Eq, Hash)]
32struct FlowKey {
33 pack_id: String,
34 flow_id: String,
35}
36
37#[derive(Clone, Debug, Serialize, Deserialize)]
38pub struct FlowSnapshot {
39 pub pack_id: String,
40 pub flow_id: String,
41 pub next_node: String,
42 pub state: ExecutionState,
43}
44
45#[derive(Clone, Debug)]
46pub struct FlowWait {
47 pub reason: Option<String>,
48 pub snapshot: FlowSnapshot,
49}
50
51#[derive(Clone, Debug)]
52pub enum FlowStatus {
53 Completed,
54 Waiting(Box<FlowWait>),
55}
56
57#[derive(Clone, Debug)]
58pub struct FlowExecution {
59 pub output: Value,
60 pub status: FlowStatus,
61}
62
63#[derive(Clone, Debug)]
64struct HostFlow {
65 id: String,
66 start: Option<NodeId>,
67 nodes: IndexMap<NodeId, HostNode>,
68}
69
70#[derive(Clone, Debug)]
71pub struct HostNode {
72 kind: NodeKind,
73 pub component: String,
75 component_id: String,
76 operation_name: Option<String>,
77 operation_in_mapping: Option<String>,
78 payload_expr: Value,
79 routing: Routing,
80}
81
82#[derive(Clone, Debug)]
83enum NodeKind {
84 Exec { target_component: String },
85 PackComponent { component_ref: String },
86 ProviderInvoke,
87 FlowCall,
88 BuiltinEmit { kind: EmitKind },
89 Wait,
90}
91
92#[derive(Clone, Debug)]
93enum EmitKind {
94 Log,
95 Response,
96 Other(String),
97}
98
99struct ComponentOverrides<'a> {
100 component: Option<&'a str>,
101 operation: Option<&'a str>,
102}
103
104struct ComponentCall {
105 component_ref: String,
106 operation: String,
107 input: Value,
108 config: Value,
109}
110
111impl FlowExecution {
112 fn completed(output: Value) -> Self {
113 Self {
114 output,
115 status: FlowStatus::Completed,
116 }
117 }
118
119 fn waiting(output: Value, wait: FlowWait) -> Self {
120 Self {
121 output,
122 status: FlowStatus::Waiting(Box::new(wait)),
123 }
124 }
125}
126
127impl FlowEngine {
128 pub async fn new(packs: Vec<Arc<PackRuntime>>, config: Arc<HostConfig>) -> Result<Self> {
129 let mut flow_sources: HashMap<FlowKey, usize> = HashMap::new();
130 let mut descriptors = Vec::new();
131 let mut bindings = HashMap::new();
132 for pack in &config.pack_bindings {
133 bindings.insert(pack.pack_id.clone(), pack.flows.clone());
134 }
135 let enforce_bindings = !bindings.is_empty();
136 for (idx, pack) in packs.iter().enumerate() {
137 let pack_id = pack.metadata().pack_id.clone();
138 if enforce_bindings && !bindings.contains_key(&pack_id) {
139 bail!("no gtbind entries found for pack {}", pack_id);
140 }
141 let flows = pack.list_flows().await?;
142 let allowed = bindings.get(&pack_id).map(|flows| {
143 flows
144 .iter()
145 .cloned()
146 .collect::<std::collections::HashSet<_>>()
147 });
148 let mut seen = std::collections::HashSet::new();
149 for flow in flows {
150 if let Some(ref allow) = allowed
151 && !allow.contains(&flow.id)
152 {
153 continue;
154 }
155 seen.insert(flow.id.clone());
156 tracing::info!(
157 flow_id = %flow.id,
158 flow_type = %flow.flow_type,
159 pack_id = %flow.pack_id,
160 pack_index = idx,
161 "registered flow"
162 );
163 flow_sources.insert(
164 FlowKey {
165 pack_id: flow.pack_id.clone(),
166 flow_id: flow.id.clone(),
167 },
168 idx,
169 );
170 descriptors.retain(|existing: &FlowDescriptor| {
171 !(existing.id == flow.id && existing.pack_id == flow.pack_id)
172 });
173 descriptors.push(flow);
174 }
175 if let Some(allow) = allowed {
176 let missing = allow.difference(&seen).cloned().collect::<Vec<_>>();
177 if !missing.is_empty() {
178 bail!(
179 "gtbind flow ids missing in pack {}: {}",
180 pack_id,
181 missing.join(", ")
182 );
183 }
184 }
185 }
186
187 let mut flow_map = HashMap::new();
188 for flow in &descriptors {
189 let pack_id = flow.pack_id.clone();
190 if let Some(&pack_idx) = flow_sources.get(&FlowKey {
191 pack_id: pack_id.clone(),
192 flow_id: flow.id.clone(),
193 }) {
194 let pack_clone = Arc::clone(&packs[pack_idx]);
195 let flow_id = flow.id.clone();
196 let task_flow_id = flow_id.clone();
197 match task::spawn_blocking(move || pack_clone.load_flow(&task_flow_id)).await {
198 Ok(Ok(loaded_flow)) => {
199 flow_map.insert(
200 FlowKey {
201 pack_id: pack_id.clone(),
202 flow_id,
203 },
204 HostFlow::from(loaded_flow),
205 );
206 }
207 Ok(Err(err)) => {
208 tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
209 }
210 Err(err) => {
211 tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
212 }
213 }
214 }
215 }
216
217 Ok(Self {
218 packs,
219 flows: descriptors,
220 flow_sources,
221 flow_cache: RwLock::new(flow_map),
222 default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
223 })
224 }
225
226 async fn get_or_load_flow(&self, pack_id: &str, flow_id: &str) -> Result<HostFlow> {
227 let key = FlowKey {
228 pack_id: pack_id.to_string(),
229 flow_id: flow_id.to_string(),
230 };
231 if let Some(flow) = self.flow_cache.read().get(&key).cloned() {
232 return Ok(flow);
233 }
234
235 let pack_idx = *self
236 .flow_sources
237 .get(&key)
238 .with_context(|| format!("flow {pack_id}:{flow_id} not registered"))?;
239 let pack = Arc::clone(&self.packs[pack_idx]);
240 let flow_id_owned = flow_id.to_string();
241 let task_flow_id = flow_id_owned.clone();
242 let flow = task::spawn_blocking(move || pack.load_flow(&task_flow_id))
243 .await
244 .context("failed to join flow metadata task")??;
245 let host_flow = HostFlow::from(flow);
246 self.flow_cache.write().insert(
247 FlowKey {
248 pack_id: pack_id.to_string(),
249 flow_id: flow_id_owned.clone(),
250 },
251 host_flow.clone(),
252 );
253 Ok(host_flow)
254 }
255
256 pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
257 let span = tracing::info_span!(
258 "flow.execute",
259 tenant = tracing::field::Empty,
260 flow_id = tracing::field::Empty,
261 node_id = tracing::field::Empty,
262 tool = tracing::field::Empty,
263 action = tracing::field::Empty
264 );
265 annotate_span(
266 &span,
267 &FlowSpanAttributes {
268 tenant: ctx.tenant,
269 flow_id: ctx.flow_id,
270 node_id: ctx.node_id,
271 tool: ctx.tool,
272 action: ctx.action,
273 },
274 );
275 set_flow_context(
276 &self.default_env,
277 ctx.tenant,
278 ctx.flow_id,
279 ctx.node_id,
280 ctx.provider_id,
281 ctx.session_id,
282 );
283 let retry_config = ctx.retry_config;
284 let original_input = input;
285 async move {
286 let mut attempt = 0u32;
287 loop {
288 attempt += 1;
289 match self.execute_once(&ctx, original_input.clone()).await {
290 Ok(value) => return Ok(value),
291 Err(err) => {
292 if attempt >= retry_config.max_attempts || !should_retry(&err) {
293 return Err(err);
294 }
295 let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
296 tracing::warn!(
297 tenant = ctx.tenant,
298 flow_id = ctx.flow_id,
299 attempt,
300 max_attempts = retry_config.max_attempts,
301 delay_ms = delay,
302 error = %err,
303 "transient flow execution failure, backing off"
304 );
305 tokio::time::sleep(Duration::from_millis(delay)).await;
306 }
307 }
308 }
309 }
310 .instrument(span)
311 .await
312 }
313
314 pub async fn resume(
315 &self,
316 ctx: FlowContext<'_>,
317 snapshot: FlowSnapshot,
318 input: Value,
319 ) -> Result<FlowExecution> {
320 if snapshot.pack_id != ctx.pack_id {
321 bail!(
322 "snapshot pack {} does not match requested {}",
323 snapshot.pack_id,
324 ctx.pack_id
325 );
326 }
327 if snapshot.flow_id != ctx.flow_id {
328 bail!(
329 "snapshot flow {} does not match requested {}",
330 snapshot.flow_id,
331 ctx.flow_id
332 );
333 }
334 let flow_ir = self.get_or_load_flow(ctx.pack_id, ctx.flow_id).await?;
335 let mut state = snapshot.state;
336 state.replace_input(input);
337 state.ensure_entry();
338 self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
339 .await
340 }
341
342 async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
343 let flow_ir = self.get_or_load_flow(ctx.pack_id, ctx.flow_id).await?;
344 let state = ExecutionState::new(input);
345 self.drive_flow(ctx, flow_ir, state, None).await
346 }
347
348 async fn drive_flow(
349 &self,
350 ctx: &FlowContext<'_>,
351 flow_ir: HostFlow,
352 mut state: ExecutionState,
353 resume_from: Option<String>,
354 ) -> Result<FlowExecution> {
355 let mut current = match resume_from {
356 Some(node) => NodeId::from_str(&node)
357 .with_context(|| format!("invalid resume node id `{node}`"))?,
358 None => flow_ir
359 .start
360 .clone()
361 .or_else(|| flow_ir.nodes.keys().next().cloned())
362 .with_context(|| format!("flow {} has no start node", flow_ir.id))?,
363 };
364
365 loop {
366 let node = flow_ir
367 .nodes
368 .get(¤t)
369 .with_context(|| format!("node {} not found", current.as_str()))?;
370
371 let payload_template = node.payload_expr.clone();
372 let prev = state
373 .last_output
374 .as_ref()
375 .cloned()
376 .unwrap_or_else(|| Value::Object(JsonMap::new()));
377 let ctx_value = template_context(&state, prev);
378 let payload =
379 render_template_value(&payload_template, &ctx_value, TemplateOptions::default())
380 .context("failed to render node input template")?;
381 let observed_payload = payload.clone();
382 let node_id = current.clone();
383 let event = NodeEvent {
384 context: ctx,
385 node_id: node_id.as_str(),
386 node,
387 payload: &observed_payload,
388 };
389 if let Some(observer) = ctx.observer {
390 observer.on_node_start(&event);
391 }
392 let DispatchOutcome {
393 output,
394 wait_reason,
395 } = self
396 .dispatch_node(ctx, node_id.as_str(), node, &mut state, payload)
397 .await?;
398
399 state.nodes.insert(node_id.clone().into(), output.clone());
400 state.last_output = Some(output.payload.clone());
401 if let Some(observer) = ctx.observer {
402 observer.on_node_end(&event, &output.payload);
403 }
404
405 let (next, should_exit) = match &node.routing {
406 Routing::Next { node_id } => (Some(node_id.clone()), false),
407 Routing::End | Routing::Reply => (None, true),
408 Routing::Branch { default, .. } => (default.clone(), default.is_none()),
409 Routing::Custom(raw) => {
410 tracing::warn!(
411 flow_id = %flow_ir.id,
412 node_id = %node_id,
413 routing = ?raw,
414 "unsupported routing; terminating flow"
415 );
416 (None, true)
417 }
418 };
419
420 if let Some(wait_reason) = wait_reason {
421 let resume_target = next.clone().ok_or_else(|| {
422 anyhow!(
423 "session.wait node {} requires a non-empty route",
424 current.as_str()
425 )
426 })?;
427 let mut snapshot_state = state.clone();
428 snapshot_state.clear_egress();
429 let snapshot = FlowSnapshot {
430 pack_id: ctx.pack_id.to_string(),
431 flow_id: ctx.flow_id.to_string(),
432 next_node: resume_target.as_str().to_string(),
433 state: snapshot_state,
434 };
435 let output_value = state.clone().finalize_with(None);
436 return Ok(FlowExecution::waiting(
437 output_value,
438 FlowWait {
439 reason: Some(wait_reason),
440 snapshot,
441 },
442 ));
443 }
444
445 if should_exit {
446 return Ok(FlowExecution::completed(
447 state.finalize_with(Some(output.payload.clone())),
448 ));
449 }
450
451 match next {
452 Some(n) => current = n,
453 None => {
454 return Ok(FlowExecution::completed(
455 state.finalize_with(Some(output.payload.clone())),
456 ));
457 }
458 }
459 }
460 }
461
462 async fn dispatch_node(
463 &self,
464 ctx: &FlowContext<'_>,
465 node_id: &str,
466 node: &HostNode,
467 state: &mut ExecutionState,
468 payload: Value,
469 ) -> Result<DispatchOutcome> {
470 match &node.kind {
471 NodeKind::Exec { target_component } => self
472 .execute_component_exec(
473 ctx,
474 node_id,
475 node,
476 payload,
477 ComponentOverrides {
478 component: Some(target_component.as_str()),
479 operation: node.operation_name.as_deref(),
480 },
481 )
482 .await
483 .map(DispatchOutcome::complete),
484 NodeKind::PackComponent { component_ref } => self
485 .execute_component_call(ctx, node_id, node, payload, component_ref.as_str())
486 .await
487 .map(DispatchOutcome::complete),
488 NodeKind::FlowCall => self
489 .execute_flow_call(ctx, payload)
490 .await
491 .map(DispatchOutcome::complete),
492 NodeKind::ProviderInvoke => self
493 .execute_provider_invoke(ctx, node_id, state, payload)
494 .await
495 .map(DispatchOutcome::complete),
496 NodeKind::BuiltinEmit { kind } => {
497 match kind {
498 EmitKind::Log | EmitKind::Response => {}
499 EmitKind::Other(component) => {
500 tracing::debug!(%component, "handling emit.* as builtin");
501 }
502 }
503 state.push_egress(payload.clone());
504 Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
505 }
506 NodeKind::Wait => {
507 let reason = extract_wait_reason(&payload);
508 Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
509 }
510 }
511 }
512
513 async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
514 #[derive(Deserialize)]
515 struct FlowCallPayload {
516 #[serde(alias = "flow")]
517 flow_id: String,
518 #[serde(default)]
519 input: Value,
520 }
521
522 let call: FlowCallPayload =
523 serde_json::from_value(payload).context("invalid payload for flow.call node")?;
524 if call.flow_id.trim().is_empty() {
525 bail!("flow.call requires a non-empty flow_id");
526 }
527
528 let sub_input = if call.input.is_null() {
529 Value::Null
530 } else {
531 call.input
532 };
533
534 let flow_id_owned = call.flow_id;
535 let action = "flow.call";
536 let sub_ctx = FlowContext {
537 tenant: ctx.tenant,
538 pack_id: ctx.pack_id,
539 flow_id: flow_id_owned.as_str(),
540 node_id: None,
541 tool: ctx.tool,
542 action: Some(action),
543 session_id: ctx.session_id,
544 provider_id: ctx.provider_id,
545 retry_config: ctx.retry_config,
546 observer: ctx.observer,
547 mocks: ctx.mocks,
548 };
549
550 let execution = Box::pin(self.execute(sub_ctx, sub_input))
551 .await
552 .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
553 match execution.status {
554 FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
555 FlowStatus::Waiting(wait) => bail!(
556 "flow.call cannot pause (flow {} waiting {:?})",
557 flow_id_owned,
558 wait.reason
559 ),
560 }
561 }
562
563 async fn execute_component_exec(
564 &self,
565 ctx: &FlowContext<'_>,
566 node_id: &str,
567 node: &HostNode,
568 payload: Value,
569 overrides: ComponentOverrides<'_>,
570 ) -> Result<NodeOutput> {
571 #[derive(Deserialize)]
572 struct ComponentPayload {
573 #[serde(default, alias = "component_ref", alias = "component")]
574 component: Option<String>,
575 #[serde(alias = "op")]
576 operation: Option<String>,
577 #[serde(default)]
578 input: Value,
579 #[serde(default)]
580 config: Value,
581 }
582
583 let payload: ComponentPayload =
584 serde_json::from_value(payload).context("invalid payload for component.exec")?;
585 let component_ref = overrides
586 .component
587 .map(str::to_string)
588 .or_else(|| payload.component.filter(|v| !v.trim().is_empty()))
589 .with_context(|| "component.exec requires a component_ref")?;
590 let operation = resolve_component_operation(
591 node_id,
592 node.component_id.as_str(),
593 payload.operation,
594 overrides.operation,
595 node.operation_in_mapping.as_deref(),
596 )?;
597 let call = ComponentCall {
598 component_ref,
599 operation,
600 input: payload.input,
601 config: payload.config,
602 };
603
604 self.invoke_component_call(ctx, node_id, call).await
605 }
606
607 async fn execute_component_call(
608 &self,
609 ctx: &FlowContext<'_>,
610 node_id: &str,
611 node: &HostNode,
612 payload: Value,
613 component_ref: &str,
614 ) -> Result<NodeOutput> {
615 let payload_operation = extract_operation_from_mapping(&payload);
616 let (input, config) = split_operation_payload(payload);
617 let operation = resolve_component_operation(
618 node_id,
619 node.component_id.as_str(),
620 payload_operation,
621 node.operation_name.as_deref(),
622 node.operation_in_mapping.as_deref(),
623 )?;
624 let call = ComponentCall {
625 component_ref: component_ref.to_string(),
626 operation,
627 input,
628 config,
629 };
630 self.invoke_component_call(ctx, node_id, call).await
631 }
632
633 async fn invoke_component_call(
634 &self,
635 ctx: &FlowContext<'_>,
636 node_id: &str,
637 call: ComponentCall,
638 ) -> Result<NodeOutput> {
639 let input_json = serde_json::to_string(&call.input)?;
640 let config_json = if call.config.is_null() {
641 None
642 } else {
643 Some(serde_json::to_string(&call.config)?)
644 };
645
646 let key = FlowKey {
647 pack_id: ctx.pack_id.to_string(),
648 flow_id: ctx.flow_id.to_string(),
649 };
650 let pack_idx = *self.flow_sources.get(&key).with_context(|| {
651 format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
652 })?;
653 let pack = Arc::clone(&self.packs[pack_idx]);
654 let exec_ctx = component_exec_ctx(ctx, node_id);
655 let value = pack
656 .invoke_component(
657 call.component_ref.as_str(),
658 exec_ctx,
659 call.operation.as_str(),
660 config_json,
661 input_json,
662 )
663 .await?;
664
665 Ok(NodeOutput::new(value))
666 }
667
668 async fn execute_provider_invoke(
669 &self,
670 ctx: &FlowContext<'_>,
671 node_id: &str,
672 state: &ExecutionState,
673 payload: Value,
674 ) -> Result<NodeOutput> {
675 #[derive(Deserialize)]
676 struct ProviderPayload {
677 #[serde(default)]
678 provider_id: Option<String>,
679 #[serde(default)]
680 provider_type: Option<String>,
681 #[serde(default, alias = "operation")]
682 op: Option<String>,
683 #[serde(default)]
684 input: Value,
685 #[serde(default)]
686 in_map: Value,
687 #[serde(default)]
688 out_map: Value,
689 #[serde(default)]
690 err_map: Value,
691 }
692
693 let payload: ProviderPayload =
694 serde_json::from_value(payload).context("invalid payload for provider.invoke")?;
695 let op = payload
696 .op
697 .as_deref()
698 .filter(|v| !v.trim().is_empty())
699 .with_context(|| "provider.invoke requires an op")?
700 .to_string();
701
702 let prev = state
703 .last_output
704 .as_ref()
705 .cloned()
706 .unwrap_or_else(|| Value::Object(JsonMap::new()));
707 let base_ctx = template_context(state, prev);
708
709 let input_value = if !payload.in_map.is_null() {
710 let mut ctx_value = base_ctx.clone();
711 if let Value::Object(ref mut map) = ctx_value {
712 map.insert("input".into(), payload.input.clone());
713 map.insert("result".into(), payload.input.clone());
714 }
715 render_template_value(
716 &payload.in_map,
717 &ctx_value,
718 TemplateOptions {
719 allow_pointer: true,
720 },
721 )
722 .context("failed to render provider.invoke in_map")?
723 } else if !payload.input.is_null() {
724 payload.input
725 } else {
726 Value::Null
727 };
728 let input_json = serde_json::to_vec(&input_value)?;
729
730 let key = FlowKey {
731 pack_id: ctx.pack_id.to_string(),
732 flow_id: ctx.flow_id.to_string(),
733 };
734 let pack_idx = *self.flow_sources.get(&key).with_context(|| {
735 format!("flow {} (pack {}) not registered", ctx.flow_id, ctx.pack_id)
736 })?;
737 let pack = Arc::clone(&self.packs[pack_idx]);
738 let binding = pack.resolve_provider(
739 payload.provider_id.as_deref(),
740 payload.provider_type.as_deref(),
741 )?;
742 let exec_ctx = component_exec_ctx(ctx, node_id);
743 let result = pack
744 .invoke_provider(&binding, exec_ctx, &op, input_json)
745 .await?;
746
747 let output = if payload.out_map.is_null() {
748 result
749 } else {
750 let mut ctx_value = base_ctx;
751 if let Value::Object(ref mut map) = ctx_value {
752 map.insert("input".into(), result.clone());
753 map.insert("result".into(), result.clone());
754 }
755 render_template_value(
756 &payload.out_map,
757 &ctx_value,
758 TemplateOptions {
759 allow_pointer: true,
760 },
761 )
762 .context("failed to render provider.invoke out_map")?
763 };
764 let _ = payload.err_map;
765 Ok(NodeOutput::new(output))
766 }
767
768 pub fn flows(&self) -> &[FlowDescriptor] {
769 &self.flows
770 }
771
772 pub fn flow_by_key(&self, pack_id: &str, flow_id: &str) -> Option<&FlowDescriptor> {
773 self.flows
774 .iter()
775 .find(|descriptor| descriptor.pack_id == pack_id && descriptor.id == flow_id)
776 }
777
778 pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
779 let mut matches = self
780 .flows
781 .iter()
782 .filter(|descriptor| descriptor.flow_type == flow_type);
783 let first = matches.next()?;
784 if matches.next().is_some() {
785 return None;
786 }
787 Some(first)
788 }
789
790 pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
791 let mut matches = self
792 .flows
793 .iter()
794 .filter(|descriptor| descriptor.id == flow_id);
795 let first = matches.next()?;
796 if matches.next().is_some() {
797 return None;
798 }
799 Some(first)
800 }
801}
802
803pub trait ExecutionObserver: Send + Sync {
804 fn on_node_start(&self, event: &NodeEvent<'_>);
805 fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
806 fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
807}
808
809pub struct NodeEvent<'a> {
810 pub context: &'a FlowContext<'a>,
811 pub node_id: &'a str,
812 pub node: &'a HostNode,
813 pub payload: &'a Value,
814}
815
816#[derive(Clone, Debug, Serialize, Deserialize)]
817pub struct ExecutionState {
818 #[serde(default)]
819 entry: Value,
820 #[serde(default)]
821 input: Value,
822 #[serde(default)]
823 nodes: HashMap<String, NodeOutput>,
824 #[serde(default)]
825 egress: Vec<Value>,
826 #[serde(default, skip_serializing_if = "Option::is_none")]
827 last_output: Option<Value>,
828}
829
830impl ExecutionState {
831 fn new(input: Value) -> Self {
832 Self {
833 entry: input.clone(),
834 input,
835 nodes: HashMap::new(),
836 egress: Vec::new(),
837 last_output: None,
838 }
839 }
840
841 fn ensure_entry(&mut self) {
842 if self.entry.is_null() {
843 self.entry = self.input.clone();
844 }
845 }
846
847 fn context(&self) -> Value {
848 let mut nodes = JsonMap::new();
849 for (id, output) in &self.nodes {
850 nodes.insert(
851 id.clone(),
852 json!({
853 "ok": output.ok,
854 "payload": output.payload.clone(),
855 "meta": output.meta.clone(),
856 }),
857 );
858 }
859 json!({
860 "entry": self.entry.clone(),
861 "input": self.input.clone(),
862 "nodes": nodes,
863 })
864 }
865
866 fn outputs_map(&self) -> JsonMap<String, Value> {
867 let mut outputs = JsonMap::new();
868 for (id, output) in &self.nodes {
869 outputs.insert(id.clone(), output.payload.clone());
870 }
871 outputs
872 }
873 fn push_egress(&mut self, payload: Value) {
874 self.egress.push(payload);
875 }
876
877 fn replace_input(&mut self, input: Value) {
878 self.input = input;
879 }
880
881 fn clear_egress(&mut self) {
882 self.egress.clear();
883 }
884
885 fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
886 if self.egress.is_empty() {
887 return final_payload.unwrap_or(Value::Null);
888 }
889 let mut emitted = std::mem::take(&mut self.egress);
890 if let Some(value) = final_payload {
891 match value {
892 Value::Null => {}
893 Value::Array(items) => emitted.extend(items),
894 other => emitted.push(other),
895 }
896 }
897 Value::Array(emitted)
898 }
899}
900
901#[derive(Clone, Debug, Serialize, Deserialize)]
902struct NodeOutput {
903 ok: bool,
904 payload: Value,
905 meta: Value,
906}
907
908impl NodeOutput {
909 fn new(payload: Value) -> Self {
910 Self {
911 ok: true,
912 payload,
913 meta: Value::Null,
914 }
915 }
916}
917
918struct DispatchOutcome {
919 output: NodeOutput,
920 wait_reason: Option<String>,
921}
922
923impl DispatchOutcome {
924 fn complete(output: NodeOutput) -> Self {
925 Self {
926 output,
927 wait_reason: None,
928 }
929 }
930
931 fn wait(output: NodeOutput, reason: Option<String>) -> Self {
932 Self {
933 output,
934 wait_reason: reason,
935 }
936 }
937}
938
939fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
940 ComponentExecCtx {
941 tenant: ComponentTenantCtx {
942 tenant: ctx.tenant.to_string(),
943 team: None,
944 user: ctx.provider_id.map(str::to_string),
945 trace_id: None,
946 correlation_id: ctx.session_id.map(str::to_string),
947 deadline_unix_ms: None,
948 attempt: 1,
949 idempotency_key: ctx.session_id.map(str::to_string),
950 },
951 flow_id: ctx.flow_id.to_string(),
952 node_id: Some(node_id.to_string()),
953 }
954}
955
956fn extract_wait_reason(payload: &Value) -> Option<String> {
957 match payload {
958 Value::String(s) => Some(s.clone()),
959 Value::Object(map) => map
960 .get("reason")
961 .and_then(Value::as_str)
962 .map(|value| value.to_string()),
963 _ => None,
964 }
965}
966
967fn template_context(state: &ExecutionState, prev: Value) -> Value {
968 let entry = if state.entry.is_null() {
969 Value::Object(JsonMap::new())
970 } else {
971 state.entry.clone()
972 };
973 let mut ctx = JsonMap::new();
974 ctx.insert("entry".into(), entry);
975 ctx.insert("prev".into(), prev);
976 ctx.insert("node".into(), Value::Object(state.outputs_map()));
977 ctx.insert("state".into(), state.context());
978 Value::Object(ctx)
979}
980
981impl From<Flow> for HostFlow {
982 fn from(value: Flow) -> Self {
983 let mut nodes = IndexMap::new();
984 for (id, node) in value.nodes {
985 nodes.insert(id.clone(), HostNode::from(node));
986 }
987 let start = value
988 .entrypoints
989 .get("default")
990 .and_then(Value::as_str)
991 .and_then(|id| NodeId::from_str(id).ok())
992 .or_else(|| nodes.keys().next().cloned());
993 Self {
994 id: value.id.as_str().to_string(),
995 start,
996 nodes,
997 }
998 }
999}
1000
1001impl From<Node> for HostNode {
1002 fn from(node: Node) -> Self {
1003 let component_ref = node.component.id.as_str().to_string();
1004 let raw_operation = node.component.operation.clone();
1005 let operation_in_mapping = extract_operation_from_mapping(&node.input.mapping);
1006 let operation_is_component_exec = raw_operation.as_deref() == Some("component.exec");
1007 let operation_is_emit = raw_operation
1008 .as_deref()
1009 .map(|op| op.starts_with("emit."))
1010 .unwrap_or(false);
1011 let is_component_exec = component_ref == "component.exec" || operation_is_component_exec;
1012
1013 let kind = if is_component_exec {
1014 let target = if component_ref == "component.exec" {
1015 if let Some(op) = raw_operation
1016 .as_deref()
1017 .filter(|op| op.starts_with("emit."))
1018 {
1019 op.to_string()
1020 } else {
1021 extract_target_component(&node.input.mapping)
1022 .unwrap_or_else(|| "component.exec".to_string())
1023 }
1024 } else {
1025 extract_target_component(&node.input.mapping)
1026 .unwrap_or_else(|| component_ref.clone())
1027 };
1028 if target.starts_with("emit.") {
1029 NodeKind::BuiltinEmit {
1030 kind: emit_kind_from_ref(&target),
1031 }
1032 } else {
1033 NodeKind::Exec {
1034 target_component: target,
1035 }
1036 }
1037 } else if operation_is_emit {
1038 NodeKind::BuiltinEmit {
1039 kind: emit_kind_from_ref(raw_operation.as_deref().unwrap_or("emit.log")),
1040 }
1041 } else {
1042 match component_ref.as_str() {
1043 "flow.call" => NodeKind::FlowCall,
1044 "provider.invoke" => NodeKind::ProviderInvoke,
1045 "session.wait" => NodeKind::Wait,
1046 comp if comp.starts_with("emit.") => NodeKind::BuiltinEmit {
1047 kind: emit_kind_from_ref(comp),
1048 },
1049 other => NodeKind::PackComponent {
1050 component_ref: other.to_string(),
1051 },
1052 }
1053 };
1054 let component_label = match &kind {
1055 NodeKind::Exec { .. } => "component.exec".to_string(),
1056 NodeKind::PackComponent { component_ref } => component_ref.clone(),
1057 NodeKind::ProviderInvoke => "provider.invoke".to_string(),
1058 NodeKind::FlowCall => "flow.call".to_string(),
1059 NodeKind::BuiltinEmit { kind } => emit_ref_from_kind(kind),
1060 NodeKind::Wait => "session.wait".to_string(),
1061 };
1062 let operation_name = if is_component_exec && operation_is_component_exec {
1063 None
1064 } else {
1065 raw_operation.clone()
1066 };
1067 let payload_expr = match kind {
1068 NodeKind::BuiltinEmit { .. } => extract_emit_payload(&node.input.mapping),
1069 _ => node.input.mapping.clone(),
1070 };
1071 Self {
1072 kind,
1073 component: component_label,
1074 component_id: if is_component_exec {
1075 "component.exec".to_string()
1076 } else {
1077 component_ref
1078 },
1079 operation_name,
1080 operation_in_mapping,
1081 payload_expr,
1082 routing: node.routing,
1083 }
1084 }
1085}
1086
1087fn extract_target_component(payload: &Value) -> Option<String> {
1088 match payload {
1089 Value::Object(map) => map
1090 .get("component")
1091 .or_else(|| map.get("component_ref"))
1092 .and_then(Value::as_str)
1093 .map(|s| s.to_string()),
1094 _ => None,
1095 }
1096}
1097
1098fn extract_operation_from_mapping(payload: &Value) -> Option<String> {
1099 match payload {
1100 Value::Object(map) => map
1101 .get("operation")
1102 .or_else(|| map.get("op"))
1103 .and_then(Value::as_str)
1104 .map(str::trim)
1105 .filter(|value| !value.is_empty())
1106 .map(|value| value.to_string()),
1107 _ => None,
1108 }
1109}
1110
1111fn extract_emit_payload(payload: &Value) -> Value {
1112 if let Value::Object(map) = payload {
1113 if let Some(input) = map.get("input") {
1114 return input.clone();
1115 }
1116 if let Some(inner) = map.get("payload") {
1117 return inner.clone();
1118 }
1119 }
1120 payload.clone()
1121}
1122
1123fn split_operation_payload(payload: Value) -> (Value, Value) {
1124 if let Value::Object(mut map) = payload.clone()
1125 && map.contains_key("input")
1126 {
1127 let input = map.remove("input").unwrap_or(Value::Null);
1128 let config = map.remove("config").unwrap_or(Value::Null);
1129 let legacy_only = map.keys().all(|key| {
1130 matches!(
1131 key.as_str(),
1132 "operation" | "op" | "component" | "component_ref"
1133 )
1134 });
1135 if legacy_only {
1136 return (input, config);
1137 }
1138 }
1139 (payload, Value::Null)
1140}
1141
1142fn resolve_component_operation(
1143 node_id: &str,
1144 component_label: &str,
1145 payload_operation: Option<String>,
1146 operation_override: Option<&str>,
1147 operation_in_mapping: Option<&str>,
1148) -> Result<String> {
1149 if let Some(op) = operation_override
1150 .map(str::trim)
1151 .filter(|value| !value.is_empty())
1152 {
1153 return Ok(op.to_string());
1154 }
1155
1156 if let Some(op) = payload_operation
1157 .as_deref()
1158 .map(str::trim)
1159 .filter(|value| !value.is_empty())
1160 {
1161 return Ok(op.to_string());
1162 }
1163
1164 let mut message = format!(
1165 "missing operation for node `{}` (component `{}`); expected node.component.operation to be set",
1166 node_id, component_label,
1167 );
1168 if let Some(found) = operation_in_mapping {
1169 message.push_str(&format!(
1170 ". Found operation in input.mapping (`{}`) but this is not used; pack compiler must preserve node.component.operation.",
1171 found
1172 ));
1173 }
1174 bail!(message);
1175}
1176
1177fn emit_kind_from_ref(component_ref: &str) -> EmitKind {
1178 match component_ref {
1179 "emit.log" => EmitKind::Log,
1180 "emit.response" => EmitKind::Response,
1181 other => EmitKind::Other(other.to_string()),
1182 }
1183}
1184
1185fn emit_ref_from_kind(kind: &EmitKind) -> String {
1186 match kind {
1187 EmitKind::Log => "emit.log".to_string(),
1188 EmitKind::Response => "emit.response".to_string(),
1189 EmitKind::Other(other) => other.clone(),
1190 }
1191}
1192
1193#[cfg(test)]
1194mod tests {
1195 use super::*;
1196 use greentic_types::{
1197 Flow, FlowComponentRef, FlowId, FlowKind, InputMapping, Node, NodeId, OutputMapping,
1198 Routing, TelemetryHints,
1199 };
1200 use serde_json::json;
1201 use std::collections::BTreeMap;
1202 use std::str::FromStr;
1203 use std::sync::Mutex;
1204 use tokio::runtime::Runtime;
1205
1206 fn minimal_engine() -> FlowEngine {
1207 FlowEngine {
1208 packs: Vec::new(),
1209 flows: Vec::new(),
1210 flow_sources: HashMap::new(),
1211 flow_cache: RwLock::new(HashMap::new()),
1212 default_env: "local".to_string(),
1213 }
1214 }
1215
1216 #[test]
1217 fn templating_renders_with_partials_and_data() {
1218 let mut state = ExecutionState::new(json!({ "city": "London" }));
1219 state.nodes.insert(
1220 "forecast".to_string(),
1221 NodeOutput::new(json!({ "temp": "20C" })),
1222 );
1223
1224 let ctx = state.context();
1226 assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
1227 }
1228
1229 #[test]
1230 fn finalize_wraps_emitted_payloads() {
1231 let mut state = ExecutionState::new(json!({}));
1232 state.push_egress(json!({ "text": "first" }));
1233 state.push_egress(json!({ "text": "second" }));
1234 let result = state.finalize_with(Some(json!({ "text": "final" })));
1235 assert_eq!(
1236 result,
1237 json!([
1238 { "text": "first" },
1239 { "text": "second" },
1240 { "text": "final" }
1241 ])
1242 );
1243 }
1244
1245 #[test]
1246 fn finalize_flattens_final_array() {
1247 let mut state = ExecutionState::new(json!({}));
1248 state.push_egress(json!({ "text": "only" }));
1249 let result = state.finalize_with(Some(json!([
1250 { "text": "extra-1" },
1251 { "text": "extra-2" }
1252 ])));
1253 assert_eq!(
1254 result,
1255 json!([
1256 { "text": "only" },
1257 { "text": "extra-1" },
1258 { "text": "extra-2" }
1259 ])
1260 );
1261 }
1262
1263 #[test]
1264 fn missing_operation_reports_node_and_component() {
1265 let engine = minimal_engine();
1266 let rt = Runtime::new().unwrap();
1267 let retry_config = RetryConfig {
1268 max_attempts: 1,
1269 base_delay_ms: 1,
1270 };
1271 let ctx = FlowContext {
1272 tenant: "tenant",
1273 pack_id: "test-pack",
1274 flow_id: "flow",
1275 node_id: Some("missing-op"),
1276 tool: None,
1277 action: None,
1278 session_id: None,
1279 provider_id: None,
1280 retry_config,
1281 observer: None,
1282 mocks: None,
1283 };
1284 let node = HostNode {
1285 kind: NodeKind::Exec {
1286 target_component: "qa.process".into(),
1287 },
1288 component: "component.exec".into(),
1289 component_id: "component.exec".into(),
1290 operation_name: None,
1291 operation_in_mapping: None,
1292 payload_expr: Value::Null,
1293 routing: Routing::End,
1294 };
1295 let _state = ExecutionState::new(Value::Null);
1296 let payload = json!({ "component": "qa.process" });
1297 let err = rt
1298 .block_on(engine.execute_component_exec(
1299 &ctx,
1300 "missing-op",
1301 &node,
1302 payload,
1303 ComponentOverrides {
1304 component: None,
1305 operation: None,
1306 },
1307 ))
1308 .unwrap_err();
1309 let message = err.to_string();
1310 assert!(
1311 message.contains("missing operation for node `missing-op`"),
1312 "unexpected message: {message}"
1313 );
1314 assert!(
1315 message.contains("(component `component.exec`)"),
1316 "unexpected message: {message}"
1317 );
1318 }
1319
1320 #[test]
1321 fn missing_operation_mentions_mapping_hint() {
1322 let engine = minimal_engine();
1323 let rt = Runtime::new().unwrap();
1324 let retry_config = RetryConfig {
1325 max_attempts: 1,
1326 base_delay_ms: 1,
1327 };
1328 let ctx = FlowContext {
1329 tenant: "tenant",
1330 pack_id: "test-pack",
1331 flow_id: "flow",
1332 node_id: Some("missing-op-hint"),
1333 tool: None,
1334 action: None,
1335 session_id: None,
1336 provider_id: None,
1337 retry_config,
1338 observer: None,
1339 mocks: None,
1340 };
1341 let node = HostNode {
1342 kind: NodeKind::Exec {
1343 target_component: "qa.process".into(),
1344 },
1345 component: "component.exec".into(),
1346 component_id: "component.exec".into(),
1347 operation_name: None,
1348 operation_in_mapping: Some("render".into()),
1349 payload_expr: Value::Null,
1350 routing: Routing::End,
1351 };
1352 let _state = ExecutionState::new(Value::Null);
1353 let payload = json!({ "component": "qa.process" });
1354 let err = rt
1355 .block_on(engine.execute_component_exec(
1356 &ctx,
1357 "missing-op-hint",
1358 &node,
1359 payload,
1360 ComponentOverrides {
1361 component: None,
1362 operation: None,
1363 },
1364 ))
1365 .unwrap_err();
1366 let message = err.to_string();
1367 assert!(
1368 message.contains("missing operation for node `missing-op-hint`"),
1369 "unexpected message: {message}"
1370 );
1371 assert!(
1372 message.contains("Found operation in input.mapping (`render`)"),
1373 "unexpected message: {message}"
1374 );
1375 }
1376
1377 struct CountingObserver {
1378 starts: Mutex<Vec<String>>,
1379 ends: Mutex<Vec<Value>>,
1380 }
1381
1382 impl CountingObserver {
1383 fn new() -> Self {
1384 Self {
1385 starts: Mutex::new(Vec::new()),
1386 ends: Mutex::new(Vec::new()),
1387 }
1388 }
1389 }
1390
1391 impl ExecutionObserver for CountingObserver {
1392 fn on_node_start(&self, event: &NodeEvent<'_>) {
1393 self.starts.lock().unwrap().push(event.node_id.to_string());
1394 }
1395
1396 fn on_node_end(&self, _event: &NodeEvent<'_>, output: &Value) {
1397 self.ends.lock().unwrap().push(output.clone());
1398 }
1399
1400 fn on_node_error(&self, _event: &NodeEvent<'_>, _error: &dyn StdError) {}
1401 }
1402
1403 #[test]
1404 fn emits_end_event_for_successful_node() {
1405 let node_id = NodeId::from_str("emit").unwrap();
1406 let node = Node {
1407 id: node_id.clone(),
1408 component: FlowComponentRef {
1409 id: "emit.log".parse().unwrap(),
1410 pack_alias: None,
1411 operation: None,
1412 },
1413 input: InputMapping {
1414 mapping: json!({ "message": "logged" }),
1415 },
1416 output: OutputMapping {
1417 mapping: Value::Null,
1418 },
1419 routing: Routing::End,
1420 telemetry: TelemetryHints::default(),
1421 };
1422 let mut nodes = indexmap::IndexMap::default();
1423 nodes.insert(node_id.clone(), node);
1424 let flow = Flow {
1425 schema_version: "1.0".into(),
1426 id: FlowId::from_str("emit.flow").unwrap(),
1427 kind: FlowKind::Messaging,
1428 entrypoints: BTreeMap::from([(
1429 "default".to_string(),
1430 Value::String(node_id.to_string()),
1431 )]),
1432 nodes,
1433 metadata: Default::default(),
1434 };
1435 let host_flow = HostFlow::from(flow);
1436
1437 let engine = FlowEngine {
1438 packs: Vec::new(),
1439 flows: Vec::new(),
1440 flow_sources: HashMap::new(),
1441 flow_cache: RwLock::new(HashMap::from([(
1442 FlowKey {
1443 pack_id: "test-pack".to_string(),
1444 flow_id: "emit.flow".to_string(),
1445 },
1446 host_flow,
1447 )])),
1448 default_env: "local".to_string(),
1449 };
1450 let observer = CountingObserver::new();
1451 let ctx = FlowContext {
1452 tenant: "demo",
1453 pack_id: "test-pack",
1454 flow_id: "emit.flow",
1455 node_id: None,
1456 tool: None,
1457 action: None,
1458 session_id: None,
1459 provider_id: None,
1460 retry_config: RetryConfig {
1461 max_attempts: 1,
1462 base_delay_ms: 1,
1463 },
1464 observer: Some(&observer),
1465 mocks: None,
1466 };
1467
1468 let rt = Runtime::new().unwrap();
1469 let result = rt.block_on(engine.execute(ctx, Value::Null)).unwrap();
1470 assert!(matches!(result.status, FlowStatus::Completed));
1471
1472 let starts = observer.starts.lock().unwrap();
1473 let ends = observer.ends.lock().unwrap();
1474 assert_eq!(starts.len(), 1);
1475 assert_eq!(ends.len(), 1);
1476 assert_eq!(ends[0], json!({ "message": "logged" }));
1477 }
1478}
1479
1480use tracing::Instrument;
1481
1482pub struct FlowContext<'a> {
1483 pub tenant: &'a str,
1484 pub pack_id: &'a str,
1485 pub flow_id: &'a str,
1486 pub node_id: Option<&'a str>,
1487 pub tool: Option<&'a str>,
1488 pub action: Option<&'a str>,
1489 pub session_id: Option<&'a str>,
1490 pub provider_id: Option<&'a str>,
1491 pub retry_config: RetryConfig,
1492 pub observer: Option<&'a dyn ExecutionObserver>,
1493 pub mocks: Option<&'a MockLayer>,
1494}
1495
1496#[derive(Copy, Clone)]
1497pub struct RetryConfig {
1498 pub max_attempts: u32,
1499 pub base_delay_ms: u64,
1500}
1501
1502fn should_retry(err: &anyhow::Error) -> bool {
1503 let lower = err.to_string().to_lowercase();
1504 lower.contains("transient") || lower.contains("unavailable") || lower.contains("internal")
1505}
1506
1507impl From<FlowRetryConfig> for RetryConfig {
1508 fn from(value: FlowRetryConfig) -> Self {
1509 Self {
1510 max_attempts: value.max_attempts.max(1),
1511 base_delay_ms: value.base_delay_ms.max(50),
1512 }
1513 }
1514}