1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::sync::Arc;
5use std::time::Duration;
6
7use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
8use anyhow::{Context, Result, anyhow, bail};
9use greentic_flow::ir::{FlowIR, NodeIR};
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use serde_json::{Map as JsonMap, Value, json};
13use tokio::task;
14
15use super::mocks::MockLayer;
16use crate::config::{FlowRetryConfig, HostConfig};
17use crate::pack::{FlowDescriptor, PackRuntime};
18use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
19
20pub struct FlowEngine {
21 packs: Vec<Arc<PackRuntime>>,
22 flows: Vec<FlowDescriptor>,
23 flow_sources: HashMap<String, usize>,
24 flow_ir: RwLock<HashMap<String, FlowIR>>,
25 default_env: String,
26}
27
28#[derive(Clone, Debug, Serialize, Deserialize)]
29pub struct FlowSnapshot {
30 pub flow_id: String,
31 pub next_node: String,
32 pub state: ExecutionState,
33}
34
35#[derive(Clone, Debug)]
36pub struct FlowWait {
37 pub reason: Option<String>,
38 pub snapshot: FlowSnapshot,
39}
40
41#[derive(Clone, Debug)]
42pub enum FlowStatus {
43 Completed,
44 Waiting(FlowWait),
45}
46
47#[derive(Clone, Debug)]
48pub struct FlowExecution {
49 pub output: Value,
50 pub status: FlowStatus,
51}
52
53impl FlowExecution {
54 fn completed(output: Value) -> Self {
55 Self {
56 output,
57 status: FlowStatus::Completed,
58 }
59 }
60
61 fn waiting(output: Value, wait: FlowWait) -> Self {
62 Self {
63 output,
64 status: FlowStatus::Waiting(wait),
65 }
66 }
67}
68
69impl FlowEngine {
70 pub async fn new(packs: Vec<Arc<PackRuntime>>, _config: Arc<HostConfig>) -> Result<Self> {
71 let mut flow_sources = HashMap::new();
72 let mut descriptors = Vec::new();
73 for (idx, pack) in packs.iter().enumerate() {
74 let flows = pack.list_flows().await?;
75 for flow in flows {
76 tracing::info!(
77 flow_id = %flow.id,
78 flow_type = %flow.flow_type,
79 pack_index = idx,
80 "registered flow"
81 );
82 flow_sources.insert(flow.id.clone(), idx);
83 descriptors.retain(|existing: &FlowDescriptor| existing.id != flow.id);
84 descriptors.push(flow);
85 }
86 }
87
88 let mut ir_map = HashMap::new();
89 for flow in &descriptors {
90 if let Some(&pack_idx) = flow_sources.get(&flow.id) {
91 let pack_clone = Arc::clone(&packs[pack_idx]);
92 let flow_id = flow.id.clone();
93 let task_flow_id = flow_id.clone();
94 match task::spawn_blocking(move || pack_clone.load_flow_ir(&task_flow_id)).await {
95 Ok(Ok(ir)) => {
96 ir_map.insert(flow_id, ir);
97 }
98 Ok(Err(err)) => {
99 tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
100 }
101 Err(err) => {
102 tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
103 }
104 }
105 }
106 }
107
108 Ok(Self {
109 packs,
110 flows: descriptors,
111 flow_sources,
112 flow_ir: RwLock::new(ir_map),
113 default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
114 })
115 }
116
117 async fn get_or_load_flow_ir(&self, flow_id: &str) -> Result<FlowIR> {
118 if let Some(ir) = self.flow_ir.read().get(flow_id).cloned() {
119 return Ok(ir);
120 }
121
122 let pack_idx = *self
123 .flow_sources
124 .get(flow_id)
125 .with_context(|| format!("flow {flow_id} not registered"))?;
126 let pack = Arc::clone(&self.packs[pack_idx]);
127 let flow_id_owned = flow_id.to_string();
128 let task_flow_id = flow_id_owned.clone();
129 let ir = task::spawn_blocking(move || pack.load_flow_ir(&task_flow_id))
130 .await
131 .context("failed to join flow metadata task")??;
132 self.flow_ir
133 .write()
134 .insert(flow_id_owned.clone(), ir.clone());
135 Ok(ir)
136 }
137
138 pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
139 let span = tracing::info_span!(
140 "flow.execute",
141 tenant = tracing::field::Empty,
142 flow_id = tracing::field::Empty,
143 node_id = tracing::field::Empty,
144 tool = tracing::field::Empty,
145 action = tracing::field::Empty
146 );
147 annotate_span(
148 &span,
149 &FlowSpanAttributes {
150 tenant: ctx.tenant,
151 flow_id: ctx.flow_id,
152 node_id: ctx.node_id,
153 tool: ctx.tool,
154 action: ctx.action,
155 },
156 );
157 set_flow_context(
158 &self.default_env,
159 ctx.tenant,
160 ctx.flow_id,
161 ctx.node_id,
162 ctx.provider_id,
163 ctx.session_id,
164 );
165 let retry_config = ctx.retry_config;
166 let original_input = input;
167 async move {
168 let mut attempt = 0u32;
169 loop {
170 attempt += 1;
171 match self.execute_once(&ctx, original_input.clone()).await {
172 Ok(value) => return Ok(value),
173 Err(err) => {
174 if attempt >= retry_config.max_attempts || !should_retry(&err) {
175 return Err(err);
176 }
177 let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
178 tracing::warn!(
179 tenant = ctx.tenant,
180 flow_id = ctx.flow_id,
181 attempt,
182 max_attempts = retry_config.max_attempts,
183 delay_ms = delay,
184 error = %err,
185 "transient flow execution failure, backing off"
186 );
187 tokio::time::sleep(Duration::from_millis(delay)).await;
188 }
189 }
190 }
191 }
192 .instrument(span)
193 .await
194 }
195
196 pub async fn resume(
197 &self,
198 ctx: FlowContext<'_>,
199 snapshot: FlowSnapshot,
200 input: Value,
201 ) -> Result<FlowExecution> {
202 if snapshot.flow_id != ctx.flow_id {
203 bail!(
204 "snapshot flow {} does not match requested {}",
205 snapshot.flow_id,
206 ctx.flow_id
207 );
208 }
209 let flow_ir = self.get_or_load_flow_ir(ctx.flow_id).await?;
210 let mut state = snapshot.state;
211 state.replace_input(input);
212 self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
213 .await
214 }
215
216 async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
217 let flow_ir = self.get_or_load_flow_ir(ctx.flow_id).await?;
218 let state = ExecutionState::new(input);
219 self.drive_flow(ctx, flow_ir, state, None).await
220 }
221
222 async fn drive_flow(
223 &self,
224 ctx: &FlowContext<'_>,
225 flow_ir: FlowIR,
226 mut state: ExecutionState,
227 resume_from: Option<String>,
228 ) -> Result<FlowExecution> {
229 let mut current = flow_ir
230 .start
231 .clone()
232 .or_else(|| flow_ir.nodes.keys().next().cloned())
233 .with_context(|| format!("flow {} has no start node", flow_ir.id))?;
234 if let Some(resume) = resume_from {
235 current = resume;
236 }
237 let mut final_payload = None;
238
239 loop {
240 let node = flow_ir
241 .nodes
242 .get(¤t)
243 .with_context(|| format!("node {current} not found"))?;
244
245 let payload = node.payload_expr.clone();
246 let observed_payload = payload.clone();
247 let node_id = current.clone();
248 let event = NodeEvent {
249 context: ctx,
250 node_id: &node_id,
251 node,
252 payload: &observed_payload,
253 };
254 if let Some(observer) = ctx.observer {
255 observer.on_node_start(&event);
256 }
257 let DispatchOutcome {
258 output,
259 wait_reason,
260 } = self
261 .dispatch_node(ctx, ¤t, node, &mut state, payload)
262 .await?;
263
264 state.nodes.insert(current.clone(), output.clone());
265
266 let mut next = None;
267 let mut should_exit = false;
268 for route in &node.routes {
269 if route.out || matches!(route.to.as_deref(), Some("out")) {
270 final_payload = Some(output.payload.clone());
271 should_exit = true;
272 break;
273 }
274 if let Some(to) = &route.to {
275 next = Some(to.clone());
276 break;
277 }
278 }
279
280 if let Some(wait_reason) = wait_reason {
281 let resume_target = next.clone().ok_or_else(|| {
282 anyhow!("session.wait node {current} requires a non-empty route")
283 })?;
284 let mut snapshot_state = state.clone();
285 snapshot_state.clear_egress();
286 let snapshot = FlowSnapshot {
287 flow_id: ctx.flow_id.to_string(),
288 next_node: resume_target,
289 state: snapshot_state,
290 };
291 let output_value = state.clone().finalize_with(None);
292 return Ok(FlowExecution::waiting(
293 output_value,
294 FlowWait {
295 reason: Some(wait_reason),
296 snapshot,
297 },
298 ));
299 }
300
301 if should_exit {
302 break;
303 }
304
305 match next {
306 Some(n) => current = n,
307 None => {
308 final_payload = Some(output.payload.clone());
309 break;
310 }
311 }
312 }
313
314 let payload = final_payload.unwrap_or(Value::Null);
315 Ok(FlowExecution::completed(state.finalize_with(Some(payload))))
316 }
317
318 async fn dispatch_node(
319 &self,
320 ctx: &FlowContext<'_>,
321 node_id: &str,
322 node: &NodeIR,
323 state: &mut ExecutionState,
324 payload: Value,
325 ) -> Result<DispatchOutcome> {
326 match node.component.as_str() {
327 "component.exec" => self
328 .execute_component_exec(ctx, node_id, state, payload)
329 .await
330 .map(DispatchOutcome::complete),
331 "flow.call" => self
332 .execute_flow_call(ctx, payload)
333 .await
334 .map(DispatchOutcome::complete),
335 component if component.starts_with("emit") => {
336 state.push_egress(payload.clone());
337 Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
338 }
339 "session.wait" => {
340 let reason = extract_wait_reason(&payload);
341 Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
342 }
343 other => bail!("unsupported node component: {other}"),
344 }
345 }
346
347 async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
348 #[derive(Deserialize)]
349 struct FlowCallPayload {
350 #[serde(alias = "flow")]
351 flow_id: String,
352 #[serde(default)]
353 input: Value,
354 }
355
356 let call: FlowCallPayload =
357 serde_json::from_value(payload).context("invalid payload for flow.call node")?;
358 if call.flow_id.trim().is_empty() {
359 bail!("flow.call requires a non-empty flow_id");
360 }
361
362 let sub_input = if call.input.is_null() {
363 Value::Null
364 } else {
365 call.input
366 };
367
368 let flow_id_owned = call.flow_id;
369 let action = "flow.call";
370 let sub_ctx = FlowContext {
371 tenant: ctx.tenant,
372 flow_id: flow_id_owned.as_str(),
373 node_id: None,
374 tool: ctx.tool,
375 action: Some(action),
376 session_id: ctx.session_id,
377 provider_id: ctx.provider_id,
378 retry_config: ctx.retry_config,
379 observer: ctx.observer,
380 mocks: ctx.mocks,
381 };
382
383 let execution = Box::pin(self.execute(sub_ctx, sub_input))
384 .await
385 .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
386 match execution.status {
387 FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
388 FlowStatus::Waiting(wait) => bail!(
389 "flow.call cannot pause (flow {} waiting {:?})",
390 flow_id_owned,
391 wait.reason
392 ),
393 }
394 }
395
396 async fn execute_component_exec(
397 &self,
398 ctx: &FlowContext<'_>,
399 node_id: &str,
400 state: &ExecutionState,
401 payload: Value,
402 ) -> Result<NodeOutput> {
403 #[derive(Deserialize)]
404 struct ComponentPayload {
405 #[serde(default, alias = "component_ref", alias = "component")]
406 component: Option<String>,
407 #[serde(alias = "op")]
408 operation: Option<String>,
409 #[serde(default)]
410 input: Value,
411 #[serde(default)]
412 config: Value,
413 }
414
415 let payload: ComponentPayload =
416 serde_json::from_value(payload).context("invalid payload for component.exec")?;
417 let component_ref = payload
418 .component
419 .filter(|v| !v.trim().is_empty())
420 .with_context(|| "component.exec requires a component_ref")?;
421 let operation = payload
422 .operation
423 .filter(|v| !v.trim().is_empty())
424 .with_context(|| "component.exec requires an operation")?;
425 let mut input = payload.input;
426 if let Value::Object(mut map) = input {
427 map.entry("state".to_string())
428 .or_insert_with(|| state.context());
429 input = Value::Object(map);
430 }
431 let input_json = serde_json::to_string(&input)?;
432 let config_json = if payload.config.is_null() {
433 None
434 } else {
435 Some(serde_json::to_string(&payload.config)?)
436 };
437
438 let pack_idx = *self
439 .flow_sources
440 .get(ctx.flow_id)
441 .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
442 let pack = Arc::clone(&self.packs[pack_idx]);
443 let exec_ctx = component_exec_ctx(ctx, node_id);
444 let value = pack
445 .invoke_component(
446 &component_ref,
447 exec_ctx,
448 &operation,
449 config_json,
450 input_json,
451 )
452 .await?;
453
454 Ok(NodeOutput::new(value))
455 }
456
457 pub fn flows(&self) -> &[FlowDescriptor] {
458 &self.flows
459 }
460
461 pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
462 self.flows
463 .iter()
464 .find(|descriptor| descriptor.flow_type == flow_type)
465 }
466
467 pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
468 self.flows
469 .iter()
470 .find(|descriptor| descriptor.id == flow_id)
471 }
472}
473
474pub trait ExecutionObserver: Send + Sync {
475 fn on_node_start(&self, event: &NodeEvent<'_>);
476 fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
477 fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
478}
479
480pub struct NodeEvent<'a> {
481 pub context: &'a FlowContext<'a>,
482 pub node_id: &'a str,
483 pub node: &'a NodeIR,
484 pub payload: &'a Value,
485}
486
487#[derive(Clone, Debug, Serialize, Deserialize)]
488pub struct ExecutionState {
489 input: Value,
490 nodes: HashMap<String, NodeOutput>,
491 egress: Vec<Value>,
492}
493
494impl ExecutionState {
495 fn new(input: Value) -> Self {
496 Self {
497 input,
498 nodes: HashMap::new(),
499 egress: Vec::new(),
500 }
501 }
502
503 fn context(&self) -> Value {
504 let mut nodes = JsonMap::new();
505 for (id, output) in &self.nodes {
506 nodes.insert(
507 id.clone(),
508 json!({
509 "ok": output.ok,
510 "payload": output.payload.clone(),
511 "meta": output.meta.clone(),
512 }),
513 );
514 }
515 json!({
516 "input": self.input.clone(),
517 "nodes": nodes,
518 })
519 }
520 fn push_egress(&mut self, payload: Value) {
521 self.egress.push(payload);
522 }
523
524 fn replace_input(&mut self, input: Value) {
525 self.input = input;
526 }
527
528 fn clear_egress(&mut self) {
529 self.egress.clear();
530 }
531
532 fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
533 if self.egress.is_empty() {
534 return final_payload.unwrap_or(Value::Null);
535 }
536 let mut emitted = std::mem::take(&mut self.egress);
537 if let Some(value) = final_payload {
538 match value {
539 Value::Null => {}
540 Value::Array(items) => emitted.extend(items),
541 other => emitted.push(other),
542 }
543 }
544 Value::Array(emitted)
545 }
546}
547
548#[derive(Clone, Debug, Serialize, Deserialize)]
549struct NodeOutput {
550 ok: bool,
551 payload: Value,
552 meta: Value,
553}
554
555impl NodeOutput {
556 fn new(payload: Value) -> Self {
557 Self {
558 ok: true,
559 payload,
560 meta: Value::Null,
561 }
562 }
563}
564
565struct DispatchOutcome {
566 output: NodeOutput,
567 wait_reason: Option<String>,
568}
569
570impl DispatchOutcome {
571 fn complete(output: NodeOutput) -> Self {
572 Self {
573 output,
574 wait_reason: None,
575 }
576 }
577
578 fn wait(output: NodeOutput, reason: Option<String>) -> Self {
579 Self {
580 output,
581 wait_reason: reason,
582 }
583 }
584}
585
586fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
587 ComponentExecCtx {
588 tenant: ComponentTenantCtx {
589 tenant: ctx.tenant.to_string(),
590 team: None,
591 user: ctx.provider_id.map(str::to_string),
592 trace_id: None,
593 correlation_id: ctx.session_id.map(str::to_string),
594 deadline_unix_ms: None,
595 attempt: 1,
596 idempotency_key: ctx.session_id.map(str::to_string),
597 },
598 flow_id: ctx.flow_id.to_string(),
599 node_id: Some(node_id.to_string()),
600 }
601}
602
603fn extract_wait_reason(payload: &Value) -> Option<String> {
604 match payload {
605 Value::String(s) => Some(s.clone()),
606 Value::Object(map) => map
607 .get("reason")
608 .and_then(Value::as_str)
609 .map(|value| value.to_string()),
610 _ => None,
611 }
612}
613
614#[cfg(test)]
615mod tests {
616 use super::*;
617 use serde_json::json;
618
619 #[test]
620 fn templating_renders_with_partials_and_data() {
621 let mut state = ExecutionState::new(json!({ "city": "London" }));
622 state.nodes.insert(
623 "forecast".to_string(),
624 NodeOutput::new(json!({ "temp": "20C" })),
625 );
626
627 let ctx = state.context();
629 assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
630 }
631
632 #[test]
633 fn finalize_wraps_emitted_payloads() {
634 let mut state = ExecutionState::new(json!({}));
635 state.push_egress(json!({ "text": "first" }));
636 state.push_egress(json!({ "text": "second" }));
637 let result = state.finalize_with(Some(json!({ "text": "final" })));
638 assert_eq!(
639 result,
640 json!([
641 { "text": "first" },
642 { "text": "second" },
643 { "text": "final" }
644 ])
645 );
646 }
647
648 #[test]
649 fn finalize_flattens_final_array() {
650 let mut state = ExecutionState::new(json!({}));
651 state.push_egress(json!({ "text": "only" }));
652 let result = state.finalize_with(Some(json!([
653 { "text": "extra-1" },
654 { "text": "extra-2" }
655 ])));
656 assert_eq!(
657 result,
658 json!([
659 { "text": "only" },
660 { "text": "extra-1" },
661 { "text": "extra-2" }
662 ])
663 );
664 }
665}
666
667use tracing::Instrument;
668
669pub struct FlowContext<'a> {
670 pub tenant: &'a str,
671 pub flow_id: &'a str,
672 pub node_id: Option<&'a str>,
673 pub tool: Option<&'a str>,
674 pub action: Option<&'a str>,
675 pub session_id: Option<&'a str>,
676 pub provider_id: Option<&'a str>,
677 pub retry_config: RetryConfig,
678 pub observer: Option<&'a dyn ExecutionObserver>,
679 pub mocks: Option<&'a MockLayer>,
680}
681
682#[derive(Copy, Clone)]
683pub struct RetryConfig {
684 pub max_attempts: u32,
685 pub base_delay_ms: u64,
686}
687
688fn should_retry(err: &anyhow::Error) -> bool {
689 let lower = err.to_string().to_lowercase();
690 lower.contains("transient") || lower.contains("unavailable") || lower.contains("internal")
691}
692
693impl From<FlowRetryConfig> for RetryConfig {
694 fn from(value: FlowRetryConfig) -> Self {
695 Self {
696 max_attempts: value.max_attempts.max(1),
697 base_delay_ms: value.base_delay_ms.max(50),
698 }
699 }
700}