greentic_flow/
lib.rs

1//! Downstream runtimes must set the current tenant telemetry context via
2//! `greentic_types::telemetry::set_current_tenant_ctx` before executing flows
3//! (for example, prior to `FlowEngine::run` in the host runner).
4#![forbid(unsafe_code)]
5#![allow(clippy::result_large_err)]
6
7pub mod add_step;
8pub mod component_catalog;
9pub mod config_flow;
10pub mod error;
11pub mod flow_bundle;
12pub mod flow_ir;
13pub mod ir;
14pub mod json_output;
15pub mod lint;
16pub mod loader;
17pub mod model;
18pub mod path_safety;
19pub mod questions;
20pub mod registry;
21pub mod resolve;
22pub mod resolve_summary;
23pub mod splice;
24pub mod template;
25pub mod util;
26
27pub use flow_bundle::{
28    ComponentPin, FlowBundle, NodeRef, blake3_hex, canonicalize_json, extract_component_pins,
29    load_and_validate_bundle, load_and_validate_bundle_with_flow,
30};
31pub use json_output::{JsonDiagnostic, LintJsonOutput, lint_to_stdout_json};
32pub use splice::{NEXT_NODE_PLACEHOLDER, splice_node_after};
33
34use crate::{error::Result, model::FlowDoc};
35use greentic_types::{
36    ComponentId, Flow, FlowComponentRef, FlowId, FlowKind, FlowMetadata, InputMapping, Node,
37    NodeId, OutputMapping, Routing, TelemetryHints, flow::FlowHasher,
38};
39use indexmap::IndexMap;
40use serde_json::Value;
41use std::collections::{BTreeMap, BTreeSet};
42use std::path::Path;
43
44/// Map a YAML flow type string to [`FlowKind`].
45pub fn map_flow_type(flow_type: &str) -> Result<FlowKind> {
46    match flow_type {
47        "messaging" => Ok(FlowKind::Messaging),
48        "event" | "events" => Ok(FlowKind::Event),
49        "component-config" => Ok(FlowKind::ComponentConfig),
50        "job" => Ok(FlowKind::Job),
51        "http" => Ok(FlowKind::Http),
52        other => Err(crate::error::FlowError::UnknownFlowType {
53            flow_type: other.to_string(),
54            location: crate::error::FlowErrorLocation::at_path("type"),
55        }),
56    }
57}
58
59/// Compile a validated [`FlowDoc`] into the canonical [`Flow`] model.
60pub fn compile_flow(doc: FlowDoc) -> Result<Flow> {
61    let kind = map_flow_type(&doc.flow_type)?;
62    let mut entrypoints = doc.entrypoints.clone();
63    if let Some(entry) = resolve_entry(&doc) {
64        entrypoints
65            .entry("default".to_string())
66            .or_insert_with(|| Value::String(entry));
67    }
68
69    let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
70    for (node_id_str, node_doc) in doc.nodes.iter() {
71        let node_id = NodeId::new(node_id_str.as_str()).map_err(|e| {
72            crate::error::FlowError::InvalidIdentifier {
73                kind: "node",
74                value: node_id_str.clone(),
75                detail: e.to_string(),
76                location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id_str}")),
77            }
78        })?;
79        let routing = compile_routing(&node_doc.routing, &doc.nodes, node_id_str)?;
80        let telemetry = node_doc
81            .telemetry
82            .as_ref()
83            .map(|t| TelemetryHints {
84                span_name: t.span_name.clone(),
85                attributes: t.attributes.clone(),
86                sampling: t.sampling.clone(),
87            })
88            .unwrap_or_default();
89        // V2: single op key in raw.
90        let mut op_key: Option<String> = None;
91        let mut payload: Option<Value> = None;
92        for (k, v) in &node_doc.raw {
93            op_key = Some(k.clone());
94            payload = Some(v.clone());
95        }
96        let output_mapping = node_doc
97            .raw
98            .get("output")
99            .cloned()
100            .unwrap_or_else(|| Value::Object(Default::default()));
101        let operation = op_key.ok_or_else(|| crate::error::FlowError::Internal {
102            message: format!("node '{node_id_str}' missing operation key"),
103            location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id_str}")),
104        })?;
105        let is_builtin = matches!(operation.as_str(), "questions" | "template");
106        let is_legacy = doc.schema_version.unwrap_or(1) < 2;
107        let (component_id, op_field) = if is_builtin || is_legacy {
108            (operation.clone(), None)
109        } else {
110            ("component.exec".to_string(), Some(operation.clone()))
111        };
112        let node = Node {
113            id: node_id.clone(),
114            component: FlowComponentRef {
115                id: ComponentId::new(&component_id).unwrap(),
116                pack_alias: None,
117                operation: op_field,
118            },
119            input: InputMapping {
120                mapping: payload.unwrap_or_else(|| Value::Object(Default::default())),
121            },
122            output: OutputMapping {
123                mapping: output_mapping,
124            },
125            routing,
126            telemetry,
127        };
128        nodes.insert(node_id, node);
129    }
130
131    let flow_id =
132        FlowId::new(doc.id.as_str()).map_err(|e| crate::error::FlowError::InvalidIdentifier {
133            kind: "flow",
134            value: doc.id.clone(),
135            detail: e.to_string(),
136            location: crate::error::FlowErrorLocation::at_path("id"),
137        })?;
138
139    let entrypoints_map: BTreeMap<String, Value> = entrypoints.into_iter().collect();
140
141    Ok(Flow {
142        schema_version: "flow-v1".to_string(),
143        id: flow_id,
144        kind,
145        entrypoints: entrypoints_map,
146        nodes,
147        metadata: FlowMetadata {
148            title: doc.title,
149            description: doc.description,
150            tags: doc.tags.into_iter().collect::<BTreeSet<_>>(),
151            extra: doc.parameters,
152        },
153    })
154}
155
156/// Compile YGTC YAML text into [`Flow`].
157pub fn compile_ygtc_str(src: &str) -> Result<Flow> {
158    let doc = loader::load_ygtc_from_str(src)?;
159    compile_flow(doc)
160}
161
162/// Compile a YGTC file into [`Flow`].
163pub fn compile_ygtc_file(path: &Path) -> Result<Flow> {
164    let doc = loader::load_ygtc_from_path(path)?;
165    compile_flow(doc)
166}
167
168fn compile_routing(
169    raw: &Value,
170    nodes: &IndexMap<String, crate::model::NodeDoc>,
171    node_id: &str,
172) -> Result<Routing> {
173    #[derive(serde::Deserialize)]
174    struct RouteDoc {
175        #[serde(default)]
176        to: Option<String>,
177        #[serde(default)]
178        out: Option<bool>,
179        #[serde(default)]
180        status: Option<String>,
181        #[serde(default)]
182        reply: Option<bool>,
183    }
184
185    let routes: Vec<RouteDoc> = if raw.is_null() {
186        Vec::new()
187    } else if let Some(shorthand) = raw.as_str() {
188        match shorthand {
189            "out" => vec![RouteDoc {
190                to: None,
191                out: Some(true),
192                status: None,
193                reply: None,
194            }],
195            "reply" => vec![RouteDoc {
196                to: None,
197                out: None,
198                status: None,
199                reply: Some(true),
200            }],
201            other => {
202                return Err(crate::error::FlowError::Routing {
203                    node_id: node_id.to_string(),
204                    message: format!("invalid routing shorthand '{other}'"),
205                    location: crate::error::FlowErrorLocation::at_path(format!(
206                        "nodes.{node_id}.routing"
207                    )),
208                });
209            }
210        }
211    } else {
212        serde_json::from_value(raw.clone()).map_err(|e| crate::error::FlowError::Routing {
213            node_id: node_id.to_string(),
214            message: e.to_string(),
215            location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id}.routing")),
216        })?
217    };
218
219    if routes.len() == 1 {
220        let route = &routes[0];
221        let is_out = route.out.unwrap_or(false);
222        if route.reply.unwrap_or(false) {
223            return Ok(Routing::Reply);
224        }
225        if let Some(to) = &route.to {
226            if to == "out" || is_out {
227                return Ok(Routing::End);
228            }
229            if !nodes.contains_key(to) {
230                return Err(crate::error::FlowError::MissingNode {
231                    target: to.clone(),
232                    node_id: node_id.to_string(),
233                    location: crate::error::FlowErrorLocation::at_path(format!(
234                        "nodes.{node_id}.routing"
235                    )),
236                });
237            }
238            return Ok(Routing::Next {
239                node_id: NodeId::new(to.as_str()).map_err(|e| {
240                    crate::error::FlowError::InvalidIdentifier {
241                        kind: "node",
242                        value: to.clone(),
243                        detail: e.to_string(),
244                        location: crate::error::FlowErrorLocation::at_path(format!(
245                            "nodes.{node_id}.routing"
246                        )),
247                    }
248                })?,
249            });
250        }
251        if is_out {
252            return Ok(Routing::End);
253        }
254        if route.status.is_some() {
255            // single status route without a destination is ambiguous
256            return Ok(Routing::Custom(raw.clone()));
257        }
258    }
259
260    if routes.is_empty() {
261        return Ok(Routing::End);
262    }
263
264    // Attempt to build a Branch when multiple status routes are present.
265    if routes.len() >= 2 {
266        use std::collections::BTreeMap;
267        let mut on_status: BTreeMap<String, NodeId> = BTreeMap::new();
268        let mut default: Option<NodeId> = None;
269        let mut any_status = false;
270        for route in &routes {
271            if route.reply.unwrap_or(false) || route.out.unwrap_or(false) {
272                return Ok(Routing::Custom(raw.clone()));
273            }
274            let to = match &route.to {
275                Some(t) => t,
276                None => return Ok(Routing::Custom(raw.clone())),
277            };
278            if !nodes.contains_key(to) {
279                return Err(crate::error::FlowError::MissingNode {
280                    target: to.clone(),
281                    node_id: node_id.to_string(),
282                    location: crate::error::FlowErrorLocation::at_path(format!(
283                        "nodes.{node_id}.routing"
284                    )),
285                });
286            }
287            let to_id = NodeId::new(to.as_str()).map_err(|e| {
288                crate::error::FlowError::InvalidIdentifier {
289                    kind: "node",
290                    value: to.clone(),
291                    detail: e.to_string(),
292                    location: crate::error::FlowErrorLocation::at_path(format!(
293                        "nodes.{node_id}.routing"
294                    )),
295                }
296            })?;
297            if let Some(status) = &route.status {
298                any_status = true;
299                on_status.insert(status.clone(), to_id);
300            } else {
301                default = Some(to_id);
302            }
303        }
304        if any_status {
305            return Ok(Routing::Branch { on_status, default });
306        }
307        if let Some(default) = default {
308            return Ok(Routing::Branch {
309                on_status,
310                default: Some(default),
311            });
312        }
313    }
314
315    Ok(Routing::Custom(raw.clone()))
316}
317
318fn resolve_entry(doc: &FlowDoc) -> Option<String> {
319    if let Some(start) = &doc.start {
320        return Some(start.clone());
321    }
322    if doc.nodes.contains_key("in") {
323        return Some("in".to_string());
324    }
325    doc.nodes.keys().next().cloned()
326}