Skip to main content

greentic_flow/
lib.rs

1//! Downstream runtimes must set the current tenant telemetry context via
2//! `greentic_types::telemetry::set_current_tenant_ctx` before executing flows
3//! (for example, prior to `FlowEngine::run` in the host runner).
4#![forbid(unsafe_code)]
5#![allow(clippy::result_large_err)]
6
7pub mod add_step;
8pub mod component_catalog;
9pub mod config_flow;
10pub mod error;
11pub mod flow_bundle;
12pub mod flow_ir;
13pub mod ir;
14pub mod json_output;
15pub mod lint;
16pub mod loader;
17pub mod model;
18pub mod path_safety;
19pub mod questions;
20pub mod questions_schema;
21pub mod registry;
22pub mod resolve;
23pub mod resolve_summary;
24pub mod splice;
25pub mod template;
26pub mod util;
27
28pub use flow_bundle::{
29    ComponentPin, FlowBundle, NodeRef, blake3_hex, canonicalize_json, extract_component_pins,
30    load_and_validate_bundle, load_and_validate_bundle_with_flow,
31};
32pub use json_output::{JsonDiagnostic, LintJsonOutput, lint_to_stdout_json};
33pub use splice::{NEXT_NODE_PLACEHOLDER, splice_node_after};
34
35use crate::{error::Result, model::FlowDoc};
36use greentic_types::{
37    ComponentId, Flow, FlowComponentRef, FlowId, FlowKind, FlowMetadata, InputMapping, Node,
38    NodeId, OutputMapping, Routing, TelemetryHints, flow::FlowHasher,
39};
40use indexmap::IndexMap;
41use serde_json::Value;
42use std::collections::{BTreeMap, BTreeSet};
43use std::path::Path;
44
45/// Map a YAML flow type string to [`FlowKind`].
46pub fn map_flow_type(flow_type: &str) -> Result<FlowKind> {
47    match flow_type {
48        "messaging" => Ok(FlowKind::Messaging),
49        "event" | "events" => Ok(FlowKind::Event),
50        "component-config" => Ok(FlowKind::ComponentConfig),
51        "job" => Ok(FlowKind::Job),
52        "http" => Ok(FlowKind::Http),
53        other => Err(crate::error::FlowError::UnknownFlowType {
54            flow_type: other.to_string(),
55            location: crate::error::FlowErrorLocation::at_path("type"),
56        }),
57    }
58}
59
60/// Compile a validated [`FlowDoc`] into the canonical [`Flow`] model.
61pub fn compile_flow(doc: FlowDoc) -> Result<Flow> {
62    let kind = map_flow_type(&doc.flow_type)?;
63    let mut entrypoints = doc.entrypoints.clone();
64    if let Some(entry) = resolve_entry(&doc) {
65        entrypoints
66            .entry("default".to_string())
67            .or_insert_with(|| Value::String(entry));
68    }
69
70    let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
71    for (node_id_str, node_doc) in doc.nodes.iter() {
72        let node_id = NodeId::new(node_id_str.as_str()).map_err(|e| {
73            crate::error::FlowError::InvalidIdentifier {
74                kind: "node",
75                value: node_id_str.clone(),
76                detail: e.to_string(),
77                location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id_str}")),
78            }
79        })?;
80        let routing = compile_routing(&node_doc.routing, &doc.nodes, node_id_str)?;
81        let telemetry = node_doc
82            .telemetry
83            .as_ref()
84            .map(|t| TelemetryHints {
85                span_name: t.span_name.clone(),
86                attributes: t.attributes.clone(),
87                sampling: t.sampling.clone(),
88            })
89            .unwrap_or_default();
90        // V2: single op key in raw.
91        let mut op_key: Option<String> = None;
92        let mut payload: Option<Value> = None;
93        for (k, v) in &node_doc.raw {
94            op_key = Some(k.clone());
95            payload = Some(v.clone());
96        }
97        let output_mapping = node_doc
98            .raw
99            .get("output")
100            .cloned()
101            .unwrap_or_else(|| Value::Object(Default::default()));
102        let operation = op_key.ok_or_else(|| crate::error::FlowError::Internal {
103            message: format!("node '{node_id_str}' missing operation key"),
104            location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id_str}")),
105        })?;
106        let is_builtin = matches!(operation.as_str(), "questions" | "template");
107        let is_legacy = doc.schema_version.unwrap_or(1) < 2;
108        let (component_id, op_field) = if is_builtin || is_legacy {
109            (operation.clone(), None)
110        } else {
111            ("component.exec".to_string(), Some(operation.clone()))
112        };
113        let node = Node {
114            id: node_id.clone(),
115            component: FlowComponentRef {
116                id: ComponentId::new(&component_id).unwrap(),
117                pack_alias: None,
118                operation: op_field,
119            },
120            input: InputMapping {
121                mapping: payload.unwrap_or_else(|| Value::Object(Default::default())),
122            },
123            output: OutputMapping {
124                mapping: output_mapping,
125            },
126            routing,
127            telemetry,
128        };
129        nodes.insert(node_id, node);
130    }
131
132    let flow_id =
133        FlowId::new(doc.id.as_str()).map_err(|e| crate::error::FlowError::InvalidIdentifier {
134            kind: "flow",
135            value: doc.id.clone(),
136            detail: e.to_string(),
137            location: crate::error::FlowErrorLocation::at_path("id"),
138        })?;
139
140    let entrypoints_map: BTreeMap<String, Value> = entrypoints.into_iter().collect();
141
142    Ok(Flow {
143        schema_version: "flow-v1".to_string(),
144        id: flow_id,
145        kind,
146        entrypoints: entrypoints_map,
147        nodes,
148        metadata: FlowMetadata {
149            title: doc.title,
150            description: doc.description,
151            tags: doc.tags.into_iter().collect::<BTreeSet<_>>(),
152            extra: doc.parameters,
153        },
154    })
155}
156
157/// Compile YGTC YAML text into [`Flow`].
158pub fn compile_ygtc_str(src: &str) -> Result<Flow> {
159    let doc = loader::load_ygtc_from_str(src)?;
160    compile_flow(doc)
161}
162
163/// Compile a YGTC file into [`Flow`].
164pub fn compile_ygtc_file(path: &Path) -> Result<Flow> {
165    let doc = loader::load_ygtc_from_path(path)?;
166    compile_flow(doc)
167}
168
169fn compile_routing(
170    raw: &Value,
171    nodes: &IndexMap<String, crate::model::NodeDoc>,
172    node_id: &str,
173) -> Result<Routing> {
174    #[derive(serde::Deserialize)]
175    struct RouteDoc {
176        #[serde(default)]
177        to: Option<String>,
178        #[serde(default)]
179        out: Option<bool>,
180        #[serde(default)]
181        status: Option<String>,
182        #[serde(default)]
183        reply: Option<bool>,
184    }
185
186    let routes: Vec<RouteDoc> = if raw.is_null() {
187        Vec::new()
188    } else if let Some(shorthand) = raw.as_str() {
189        match shorthand {
190            "out" => vec![RouteDoc {
191                to: None,
192                out: Some(true),
193                status: None,
194                reply: None,
195            }],
196            "reply" => vec![RouteDoc {
197                to: None,
198                out: None,
199                status: None,
200                reply: Some(true),
201            }],
202            other => {
203                return Err(crate::error::FlowError::Routing {
204                    node_id: node_id.to_string(),
205                    message: format!("invalid routing shorthand '{other}'"),
206                    location: crate::error::FlowErrorLocation::at_path(format!(
207                        "nodes.{node_id}.routing"
208                    )),
209                });
210            }
211        }
212    } else {
213        serde_json::from_value(raw.clone()).map_err(|e| crate::error::FlowError::Routing {
214            node_id: node_id.to_string(),
215            message: e.to_string(),
216            location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id}.routing")),
217        })?
218    };
219
220    if routes.len() == 1 {
221        let route = &routes[0];
222        let is_out = route.out.unwrap_or(false);
223        if route.reply.unwrap_or(false) {
224            return Ok(Routing::Reply);
225        }
226        if let Some(to) = &route.to {
227            if to == "out" || is_out {
228                return Ok(Routing::End);
229            }
230            if !nodes.contains_key(to) {
231                return Err(crate::error::FlowError::MissingNode {
232                    target: to.clone(),
233                    node_id: node_id.to_string(),
234                    location: crate::error::FlowErrorLocation::at_path(format!(
235                        "nodes.{node_id}.routing"
236                    )),
237                });
238            }
239            return Ok(Routing::Next {
240                node_id: NodeId::new(to.as_str()).map_err(|e| {
241                    crate::error::FlowError::InvalidIdentifier {
242                        kind: "node",
243                        value: to.clone(),
244                        detail: e.to_string(),
245                        location: crate::error::FlowErrorLocation::at_path(format!(
246                            "nodes.{node_id}.routing"
247                        )),
248                    }
249                })?,
250            });
251        }
252        if is_out {
253            return Ok(Routing::End);
254        }
255        if route.status.is_some() {
256            // single status route without a destination is ambiguous
257            return Ok(Routing::Custom(raw.clone()));
258        }
259    }
260
261    if routes.is_empty() {
262        return Ok(Routing::End);
263    }
264
265    // Attempt to build a Branch when multiple status routes are present.
266    if routes.len() >= 2 {
267        use std::collections::BTreeMap;
268        let mut on_status: BTreeMap<String, NodeId> = BTreeMap::new();
269        let mut default: Option<NodeId> = None;
270        let mut any_status = false;
271        for route in &routes {
272            if route.reply.unwrap_or(false) || route.out.unwrap_or(false) {
273                return Ok(Routing::Custom(raw.clone()));
274            }
275            let to = match &route.to {
276                Some(t) => t,
277                None => return Ok(Routing::Custom(raw.clone())),
278            };
279            if !nodes.contains_key(to) {
280                return Err(crate::error::FlowError::MissingNode {
281                    target: to.clone(),
282                    node_id: node_id.to_string(),
283                    location: crate::error::FlowErrorLocation::at_path(format!(
284                        "nodes.{node_id}.routing"
285                    )),
286                });
287            }
288            let to_id = NodeId::new(to.as_str()).map_err(|e| {
289                crate::error::FlowError::InvalidIdentifier {
290                    kind: "node",
291                    value: to.clone(),
292                    detail: e.to_string(),
293                    location: crate::error::FlowErrorLocation::at_path(format!(
294                        "nodes.{node_id}.routing"
295                    )),
296                }
297            })?;
298            if let Some(status) = &route.status {
299                any_status = true;
300                on_status.insert(status.clone(), to_id);
301            } else {
302                default = Some(to_id);
303            }
304        }
305        if any_status {
306            return Ok(Routing::Branch { on_status, default });
307        }
308        if let Some(default) = default {
309            return Ok(Routing::Branch {
310                on_status,
311                default: Some(default),
312            });
313        }
314    }
315
316    Ok(Routing::Custom(raw.clone()))
317}
318
319fn resolve_entry(doc: &FlowDoc) -> Option<String> {
320    if let Some(start) = &doc.start {
321        return Some(start.clone());
322    }
323    if doc.nodes.contains_key("in") {
324        return Some("in".to_string());
325    }
326    doc.nodes.keys().next().cloned()
327}