1use std::collections::HashMap;
2use std::env;
3use std::error::Error as StdError;
4use std::sync::Arc;
5use std::time::Duration;
6
7use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
8use anyhow::{Context, Result, anyhow, bail};
9use greentic_flow::ir::{FlowIR, NodeIR};
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use serde_json::{Map as JsonMap, Value, json};
13use tokio::task;
14
15use super::mocks::MockLayer;
16use crate::config::{FlowRetryConfig, HostConfig};
17use crate::pack::{FlowDescriptor, PackRuntime};
18use crate::telemetry::{FlowSpanAttributes, annotate_span, backoff_delay_ms, set_flow_context};
19
20pub struct FlowEngine {
21 packs: Vec<Arc<PackRuntime>>,
22 flows: Vec<FlowDescriptor>,
23 flow_sources: HashMap<String, usize>,
24 flow_ir: RwLock<HashMap<String, FlowIR>>,
25 default_env: String,
26}
27
28#[derive(Clone, Debug, Serialize, Deserialize)]
29pub struct FlowSnapshot {
30 pub flow_id: String,
31 pub next_node: String,
32 pub state: ExecutionState,
33}
34
35#[derive(Clone, Debug)]
36pub struct FlowWait {
37 pub reason: Option<String>,
38 pub snapshot: FlowSnapshot,
39}
40
41#[derive(Clone, Debug)]
42pub enum FlowStatus {
43 Completed,
44 Waiting(FlowWait),
45}
46
47#[derive(Clone, Debug)]
48pub struct FlowExecution {
49 pub output: Value,
50 pub status: FlowStatus,
51}
52
53impl FlowExecution {
54 fn completed(output: Value) -> Self {
55 Self {
56 output,
57 status: FlowStatus::Completed,
58 }
59 }
60
61 fn waiting(output: Value, wait: FlowWait) -> Self {
62 Self {
63 output,
64 status: FlowStatus::Waiting(wait),
65 }
66 }
67}
68
69impl FlowEngine {
70 pub async fn new(packs: Vec<Arc<PackRuntime>>, _config: Arc<HostConfig>) -> Result<Self> {
71 let mut flow_sources = HashMap::new();
72 let mut descriptors = Vec::new();
73 for (idx, pack) in packs.iter().enumerate() {
74 let flows = pack.list_flows().await?;
75 for flow in flows {
76 tracing::info!(
77 flow_id = %flow.id,
78 flow_type = %flow.flow_type,
79 pack_index = idx,
80 "registered flow"
81 );
82 flow_sources.insert(flow.id.clone(), idx);
83 descriptors.retain(|existing: &FlowDescriptor| existing.id != flow.id);
84 descriptors.push(flow);
85 }
86 }
87
88 let mut ir_map = HashMap::new();
89 for flow in &descriptors {
90 if let Some(&pack_idx) = flow_sources.get(&flow.id) {
91 let pack_clone = Arc::clone(&packs[pack_idx]);
92 let flow_id = flow.id.clone();
93 let task_flow_id = flow_id.clone();
94 match task::spawn_blocking(move || pack_clone.load_flow_ir(&task_flow_id)).await {
95 Ok(Ok(ir)) => {
96 ir_map.insert(flow_id, ir);
97 }
98 Ok(Err(err)) => {
99 tracing::warn!(flow_id = %flow.id, error = %err, "failed to load flow metadata");
100 }
101 Err(err) => {
102 tracing::warn!(flow_id = %flow.id, error = %err, "join error loading flow metadata");
103 }
104 }
105 }
106 }
107
108 Ok(Self {
109 packs,
110 flows: descriptors,
111 flow_sources,
112 flow_ir: RwLock::new(ir_map),
113 default_env: env::var("GREENTIC_ENV").unwrap_or_else(|_| "local".to_string()),
114 })
115 }
116
117 async fn get_or_load_flow_ir(&self, flow_id: &str) -> Result<FlowIR> {
118 if let Some(ir) = self.flow_ir.read().get(flow_id).cloned() {
119 return Ok(ir);
120 }
121
122 let pack_idx = *self
123 .flow_sources
124 .get(flow_id)
125 .with_context(|| format!("flow {flow_id} not registered"))?;
126 let pack = Arc::clone(&self.packs[pack_idx]);
127 let flow_id_owned = flow_id.to_string();
128 let task_flow_id = flow_id_owned.clone();
129 let ir = task::spawn_blocking(move || pack.load_flow_ir(&task_flow_id))
130 .await
131 .context("failed to join flow metadata task")??;
132 self.flow_ir
133 .write()
134 .insert(flow_id_owned.clone(), ir.clone());
135 Ok(ir)
136 }
137
138 pub async fn execute(&self, ctx: FlowContext<'_>, input: Value) -> Result<FlowExecution> {
139 let span = tracing::info_span!(
140 "flow.execute",
141 tenant = tracing::field::Empty,
142 flow_id = tracing::field::Empty,
143 node_id = tracing::field::Empty,
144 tool = tracing::field::Empty,
145 action = tracing::field::Empty
146 );
147 annotate_span(
148 &span,
149 &FlowSpanAttributes {
150 tenant: ctx.tenant,
151 flow_id: ctx.flow_id,
152 node_id: ctx.node_id,
153 tool: ctx.tool,
154 action: ctx.action,
155 },
156 );
157 set_flow_context(
158 &self.default_env,
159 ctx.tenant,
160 ctx.flow_id,
161 ctx.node_id,
162 ctx.provider_id,
163 ctx.session_id,
164 );
165 let retry_config = ctx.retry_config;
166 let original_input = input;
167 async move {
168 let mut attempt = 0u32;
169 loop {
170 attempt += 1;
171 match self.execute_once(&ctx, original_input.clone()).await {
172 Ok(value) => return Ok(value),
173 Err(err) => {
174 if attempt >= retry_config.max_attempts || !should_retry(&err) {
175 return Err(err);
176 }
177 let delay = backoff_delay_ms(retry_config.base_delay_ms, attempt - 1);
178 tracing::warn!(
179 tenant = ctx.tenant,
180 flow_id = ctx.flow_id,
181 attempt,
182 max_attempts = retry_config.max_attempts,
183 delay_ms = delay,
184 error = %err,
185 "transient flow execution failure, backing off"
186 );
187 tokio::time::sleep(Duration::from_millis(delay)).await;
188 }
189 }
190 }
191 }
192 .instrument(span)
193 .await
194 }
195
196 pub async fn resume(
197 &self,
198 ctx: FlowContext<'_>,
199 snapshot: FlowSnapshot,
200 input: Value,
201 ) -> Result<FlowExecution> {
202 if snapshot.flow_id != ctx.flow_id {
203 bail!(
204 "snapshot flow {} does not match requested {}",
205 snapshot.flow_id,
206 ctx.flow_id
207 );
208 }
209 let flow_ir = self.get_or_load_flow_ir(ctx.flow_id).await?;
210 let mut state = snapshot.state;
211 state.replace_input(input);
212 self.drive_flow(&ctx, flow_ir, state, Some(snapshot.next_node))
213 .await
214 }
215
216 async fn execute_once(&self, ctx: &FlowContext<'_>, input: Value) -> Result<FlowExecution> {
217 let flow_ir = self.get_or_load_flow_ir(ctx.flow_id).await?;
218 let state = ExecutionState::new(input);
219 self.drive_flow(ctx, flow_ir, state, None).await
220 }
221
222 async fn drive_flow(
223 &self,
224 ctx: &FlowContext<'_>,
225 flow_ir: FlowIR,
226 mut state: ExecutionState,
227 resume_from: Option<String>,
228 ) -> Result<FlowExecution> {
229 let mut current = flow_ir
230 .start
231 .clone()
232 .or_else(|| flow_ir.nodes.keys().next().cloned())
233 .with_context(|| format!("flow {} has no start node", flow_ir.id))?;
234 if let Some(resume) = resume_from {
235 current = resume;
236 }
237 let mut final_payload = None;
238
239 loop {
240 let node = flow_ir
241 .nodes
242 .get(¤t)
243 .with_context(|| format!("node {current} not found"))?;
244
245 let payload = node.payload_expr.clone();
246 let observed_payload = payload.clone();
247 let node_id = current.clone();
248 let event = NodeEvent {
249 context: ctx,
250 node_id: &node_id,
251 node,
252 payload: &observed_payload,
253 };
254 if let Some(observer) = ctx.observer {
255 observer.on_node_start(&event);
256 }
257 let DispatchOutcome {
258 output,
259 wait_reason,
260 } = self
261 .dispatch_node(ctx, ¤t, node, &mut state, payload)
262 .await?;
263
264 state.nodes.insert(current.clone(), output.clone());
265
266 let mut next = None;
267 let mut should_exit = false;
268 for route in &node.routes {
269 if route.out || matches!(route.to.as_deref(), Some("out")) {
270 final_payload = Some(output.payload.clone());
271 should_exit = true;
272 break;
273 }
274 if let Some(to) = &route.to {
275 next = Some(to.clone());
276 break;
277 }
278 }
279
280 if let Some(wait_reason) = wait_reason {
281 let resume_target = next.clone().ok_or_else(|| {
282 anyhow!("session.wait node {current} requires a non-empty route")
283 })?;
284 let mut snapshot_state = state.clone();
285 snapshot_state.clear_egress();
286 let snapshot = FlowSnapshot {
287 flow_id: ctx.flow_id.to_string(),
288 next_node: resume_target,
289 state: snapshot_state,
290 };
291 let output_value = state.clone().finalize_with(None);
292 return Ok(FlowExecution::waiting(
293 output_value,
294 FlowWait {
295 reason: Some(wait_reason),
296 snapshot,
297 },
298 ));
299 }
300
301 if should_exit {
302 break;
303 }
304
305 match next {
306 Some(n) => current = n,
307 None => {
308 final_payload = Some(output.payload.clone());
309 break;
310 }
311 }
312 }
313
314 let payload = final_payload.unwrap_or(Value::Null);
315 Ok(FlowExecution::completed(state.finalize_with(Some(payload))))
316 }
317
318 async fn dispatch_node(
319 &self,
320 ctx: &FlowContext<'_>,
321 node_id: &str,
322 node: &NodeIR,
323 state: &mut ExecutionState,
324 payload: Value,
325 ) -> Result<DispatchOutcome> {
326 match node.component.as_str() {
327 "component.exec" => self
328 .execute_component_exec(ctx, node_id, state, payload)
329 .await
330 .map(DispatchOutcome::complete),
331 "flow.call" => self
332 .execute_flow_call(ctx, payload)
333 .await
334 .map(DispatchOutcome::complete),
335 component if component.starts_with("emit") => {
336 state.push_egress(payload.clone());
337 Ok(DispatchOutcome::complete(NodeOutput::new(payload)))
338 }
339 "session.wait" => {
340 let reason = extract_wait_reason(&payload);
341 Ok(DispatchOutcome::wait(NodeOutput::new(payload), reason))
342 }
343 other => bail!(
344 "unsupported node component `{other}` in flow {} node {}",
345 ctx.flow_id,
346 node_id
347 ),
348 }
349 }
350
351 async fn execute_flow_call(&self, ctx: &FlowContext<'_>, payload: Value) -> Result<NodeOutput> {
352 #[derive(Deserialize)]
353 struct FlowCallPayload {
354 #[serde(alias = "flow")]
355 flow_id: String,
356 #[serde(default)]
357 input: Value,
358 }
359
360 let call: FlowCallPayload =
361 serde_json::from_value(payload).context("invalid payload for flow.call node")?;
362 if call.flow_id.trim().is_empty() {
363 bail!("flow.call requires a non-empty flow_id");
364 }
365
366 let sub_input = if call.input.is_null() {
367 Value::Null
368 } else {
369 call.input
370 };
371
372 let flow_id_owned = call.flow_id;
373 let action = "flow.call";
374 let sub_ctx = FlowContext {
375 tenant: ctx.tenant,
376 flow_id: flow_id_owned.as_str(),
377 node_id: None,
378 tool: ctx.tool,
379 action: Some(action),
380 session_id: ctx.session_id,
381 provider_id: ctx.provider_id,
382 retry_config: ctx.retry_config,
383 observer: ctx.observer,
384 mocks: ctx.mocks,
385 };
386
387 let execution = Box::pin(self.execute(sub_ctx, sub_input))
388 .await
389 .with_context(|| format!("flow.call failed for {}", flow_id_owned))?;
390 match execution.status {
391 FlowStatus::Completed => Ok(NodeOutput::new(execution.output)),
392 FlowStatus::Waiting(wait) => bail!(
393 "flow.call cannot pause (flow {} waiting {:?})",
394 flow_id_owned,
395 wait.reason
396 ),
397 }
398 }
399
400 async fn execute_component_exec(
401 &self,
402 ctx: &FlowContext<'_>,
403 node_id: &str,
404 state: &ExecutionState,
405 payload: Value,
406 ) -> Result<NodeOutput> {
407 #[derive(Deserialize)]
408 struct ComponentPayload {
409 #[serde(default, alias = "component_ref", alias = "component")]
410 component: Option<String>,
411 #[serde(alias = "op")]
412 operation: Option<String>,
413 #[serde(default)]
414 input: Value,
415 #[serde(default)]
416 config: Value,
417 }
418
419 let payload: ComponentPayload =
420 serde_json::from_value(payload).context("invalid payload for component.exec")?;
421 let component_ref = payload
422 .component
423 .filter(|v| !v.trim().is_empty())
424 .with_context(|| "component.exec requires a component_ref")?;
425 let operation = payload
426 .operation
427 .filter(|v| !v.trim().is_empty())
428 .with_context(|| "component.exec requires an operation")?;
429 let mut input = payload.input;
430 if let Value::Object(mut map) = input {
431 map.entry("state".to_string())
432 .or_insert_with(|| state.context());
433 input = Value::Object(map);
434 }
435 let input_json = serde_json::to_string(&input)?;
436 let config_json = if payload.config.is_null() {
437 None
438 } else {
439 Some(serde_json::to_string(&payload.config)?)
440 };
441
442 let pack_idx = *self
443 .flow_sources
444 .get(ctx.flow_id)
445 .with_context(|| format!("flow {} not registered", ctx.flow_id))?;
446 let pack = Arc::clone(&self.packs[pack_idx]);
447 let exec_ctx = component_exec_ctx(ctx, node_id);
448 let value = pack
449 .invoke_component(
450 &component_ref,
451 exec_ctx,
452 &operation,
453 config_json,
454 input_json,
455 )
456 .await?;
457
458 Ok(NodeOutput::new(value))
459 }
460
461 pub fn flows(&self) -> &[FlowDescriptor] {
462 &self.flows
463 }
464
465 pub fn flow_by_type(&self, flow_type: &str) -> Option<&FlowDescriptor> {
466 self.flows
467 .iter()
468 .find(|descriptor| descriptor.flow_type == flow_type)
469 }
470
471 pub fn flow_by_id(&self, flow_id: &str) -> Option<&FlowDescriptor> {
472 self.flows
473 .iter()
474 .find(|descriptor| descriptor.id == flow_id)
475 }
476}
477
478pub trait ExecutionObserver: Send + Sync {
479 fn on_node_start(&self, event: &NodeEvent<'_>);
480 fn on_node_end(&self, event: &NodeEvent<'_>, output: &Value);
481 fn on_node_error(&self, event: &NodeEvent<'_>, error: &dyn StdError);
482}
483
484pub struct NodeEvent<'a> {
485 pub context: &'a FlowContext<'a>,
486 pub node_id: &'a str,
487 pub node: &'a NodeIR,
488 pub payload: &'a Value,
489}
490
491#[derive(Clone, Debug, Serialize, Deserialize)]
492pub struct ExecutionState {
493 input: Value,
494 nodes: HashMap<String, NodeOutput>,
495 egress: Vec<Value>,
496}
497
498impl ExecutionState {
499 fn new(input: Value) -> Self {
500 Self {
501 input,
502 nodes: HashMap::new(),
503 egress: Vec::new(),
504 }
505 }
506
507 fn context(&self) -> Value {
508 let mut nodes = JsonMap::new();
509 for (id, output) in &self.nodes {
510 nodes.insert(
511 id.clone(),
512 json!({
513 "ok": output.ok,
514 "payload": output.payload.clone(),
515 "meta": output.meta.clone(),
516 }),
517 );
518 }
519 json!({
520 "input": self.input.clone(),
521 "nodes": nodes,
522 })
523 }
524 fn push_egress(&mut self, payload: Value) {
525 self.egress.push(payload);
526 }
527
528 fn replace_input(&mut self, input: Value) {
529 self.input = input;
530 }
531
532 fn clear_egress(&mut self) {
533 self.egress.clear();
534 }
535
536 fn finalize_with(mut self, final_payload: Option<Value>) -> Value {
537 if self.egress.is_empty() {
538 return final_payload.unwrap_or(Value::Null);
539 }
540 let mut emitted = std::mem::take(&mut self.egress);
541 if let Some(value) = final_payload {
542 match value {
543 Value::Null => {}
544 Value::Array(items) => emitted.extend(items),
545 other => emitted.push(other),
546 }
547 }
548 Value::Array(emitted)
549 }
550}
551
552#[derive(Clone, Debug, Serialize, Deserialize)]
553struct NodeOutput {
554 ok: bool,
555 payload: Value,
556 meta: Value,
557}
558
559impl NodeOutput {
560 fn new(payload: Value) -> Self {
561 Self {
562 ok: true,
563 payload,
564 meta: Value::Null,
565 }
566 }
567}
568
569struct DispatchOutcome {
570 output: NodeOutput,
571 wait_reason: Option<String>,
572}
573
574impl DispatchOutcome {
575 fn complete(output: NodeOutput) -> Self {
576 Self {
577 output,
578 wait_reason: None,
579 }
580 }
581
582 fn wait(output: NodeOutput, reason: Option<String>) -> Self {
583 Self {
584 output,
585 wait_reason: reason,
586 }
587 }
588}
589
590fn component_exec_ctx(ctx: &FlowContext<'_>, node_id: &str) -> ComponentExecCtx {
591 ComponentExecCtx {
592 tenant: ComponentTenantCtx {
593 tenant: ctx.tenant.to_string(),
594 team: None,
595 user: ctx.provider_id.map(str::to_string),
596 trace_id: None,
597 correlation_id: ctx.session_id.map(str::to_string),
598 deadline_unix_ms: None,
599 attempt: 1,
600 idempotency_key: ctx.session_id.map(str::to_string),
601 },
602 flow_id: ctx.flow_id.to_string(),
603 node_id: Some(node_id.to_string()),
604 }
605}
606
607fn extract_wait_reason(payload: &Value) -> Option<String> {
608 match payload {
609 Value::String(s) => Some(s.clone()),
610 Value::Object(map) => map
611 .get("reason")
612 .and_then(Value::as_str)
613 .map(|value| value.to_string()),
614 _ => None,
615 }
616}
617
618#[cfg(test)]
619mod tests {
620 use super::*;
621 use serde_json::json;
622
623 #[test]
624 fn templating_renders_with_partials_and_data() {
625 let mut state = ExecutionState::new(json!({ "city": "London" }));
626 state.nodes.insert(
627 "forecast".to_string(),
628 NodeOutput::new(json!({ "temp": "20C" })),
629 );
630
631 let ctx = state.context();
633 assert_eq!(ctx["nodes"]["forecast"]["payload"]["temp"], json!("20C"));
634 }
635
636 #[test]
637 fn finalize_wraps_emitted_payloads() {
638 let mut state = ExecutionState::new(json!({}));
639 state.push_egress(json!({ "text": "first" }));
640 state.push_egress(json!({ "text": "second" }));
641 let result = state.finalize_with(Some(json!({ "text": "final" })));
642 assert_eq!(
643 result,
644 json!([
645 { "text": "first" },
646 { "text": "second" },
647 { "text": "final" }
648 ])
649 );
650 }
651
652 #[test]
653 fn finalize_flattens_final_array() {
654 let mut state = ExecutionState::new(json!({}));
655 state.push_egress(json!({ "text": "only" }));
656 let result = state.finalize_with(Some(json!([
657 { "text": "extra-1" },
658 { "text": "extra-2" }
659 ])));
660 assert_eq!(
661 result,
662 json!([
663 { "text": "only" },
664 { "text": "extra-1" },
665 { "text": "extra-2" }
666 ])
667 );
668 }
669}
670
671use tracing::Instrument;
672
673pub struct FlowContext<'a> {
674 pub tenant: &'a str,
675 pub flow_id: &'a str,
676 pub node_id: Option<&'a str>,
677 pub tool: Option<&'a str>,
678 pub action: Option<&'a str>,
679 pub session_id: Option<&'a str>,
680 pub provider_id: Option<&'a str>,
681 pub retry_config: RetryConfig,
682 pub observer: Option<&'a dyn ExecutionObserver>,
683 pub mocks: Option<&'a MockLayer>,
684}
685
686#[derive(Copy, Clone)]
687pub struct RetryConfig {
688 pub max_attempts: u32,
689 pub base_delay_ms: u64,
690}
691
692fn should_retry(err: &anyhow::Error) -> bool {
693 let lower = err.to_string().to_lowercase();
694 lower.contains("transient") || lower.contains("unavailable") || lower.contains("internal")
695}
696
697impl From<FlowRetryConfig> for RetryConfig {
698 fn from(value: FlowRetryConfig) -> Self {
699 Self {
700 max_attempts: value.max_attempts.max(1),
701 base_delay_ms: value.base_delay_ms.max(50),
702 }
703 }
704}