Skip to main content

greentic_flow/
lib.rs

1//! Downstream runtimes must set the current tenant telemetry context via
2//! `greentic_types::telemetry::set_current_tenant_ctx` before executing flows
3//! (for example, prior to `FlowEngine::run` in the host runner).
4#![forbid(unsafe_code)]
5#![allow(clippy::result_large_err)]
6
7pub mod add_step;
8pub mod component_catalog;
9pub mod component_schema;
10pub mod config_flow;
11pub mod error;
12pub mod flow_bundle;
13pub mod flow_ir;
14pub mod ir;
15pub mod json_output;
16pub mod lint;
17pub mod loader;
18pub mod model;
19pub mod path_safety;
20pub mod questions;
21pub mod questions_schema;
22pub mod registry;
23pub mod resolve;
24pub mod resolve_summary;
25pub mod schema_mode;
26pub mod splice;
27pub mod template;
28pub mod util;
29
30pub use flow_bundle::{
31    ComponentPin, FlowBundle, NodeRef, blake3_hex, canonicalize_json, extract_component_pins,
32    load_and_validate_bundle, load_and_validate_bundle_with_flow,
33};
34pub use json_output::{JsonDiagnostic, LintJsonOutput, lint_to_stdout_json};
35pub use splice::{NEXT_NODE_PLACEHOLDER, splice_node_after};
36
37use crate::{error::Result, model::FlowDoc};
38use greentic_types::{
39    ComponentId, Flow, FlowComponentRef, FlowId, FlowKind, FlowMetadata, InputMapping, Node,
40    NodeId, OutputMapping, Routing, TelemetryHints, flow::FlowHasher,
41};
42use indexmap::IndexMap;
43use serde_json::Value;
44use std::collections::{BTreeMap, BTreeSet};
45use std::path::Path;
46
47/// Map a YAML flow type string to [`FlowKind`].
48pub fn map_flow_type(flow_type: &str) -> Result<FlowKind> {
49    match flow_type {
50        "messaging" => Ok(FlowKind::Messaging),
51        "event" | "events" => Ok(FlowKind::Event),
52        "component-config" => Ok(FlowKind::ComponentConfig),
53        "job" => Ok(FlowKind::Job),
54        "http" => Ok(FlowKind::Http),
55        other => Err(crate::error::FlowError::UnknownFlowType {
56            flow_type: other.to_string(),
57            location: crate::error::FlowErrorLocation::at_path("type"),
58        }),
59    }
60}
61
62/// Compile a validated [`FlowDoc`] into the canonical [`Flow`] model.
63pub fn compile_flow(doc: FlowDoc) -> Result<Flow> {
64    let kind = map_flow_type(&doc.flow_type)?;
65    let mut entrypoints = doc.entrypoints.clone();
66    if let Some(entry) = resolve_entry(&doc) {
67        entrypoints
68            .entry("default".to_string())
69            .or_insert_with(|| Value::String(entry));
70    }
71
72    let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
73    for (node_id_str, node_doc) in doc.nodes.iter() {
74        let node_id = NodeId::new(node_id_str.as_str()).map_err(|e| {
75            crate::error::FlowError::InvalidIdentifier {
76                kind: "node",
77                value: node_id_str.clone(),
78                detail: e.to_string(),
79                location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id_str}")),
80            }
81        })?;
82        let routing = compile_routing(&node_doc.routing, &doc.nodes, node_id_str)?;
83        let telemetry = node_doc
84            .telemetry
85            .as_ref()
86            .map(|t| TelemetryHints {
87                span_name: t.span_name.clone(),
88                attributes: t.attributes.clone(),
89                sampling: t.sampling.clone(),
90            })
91            .unwrap_or_default();
92        // V2: single op key in raw.
93        let mut op_key: Option<String> = None;
94        let mut payload: Option<Value> = None;
95        for (k, v) in &node_doc.raw {
96            op_key = Some(k.clone());
97            payload = Some(v.clone());
98        }
99        let output_mapping = node_doc
100            .raw
101            .get("output")
102            .cloned()
103            .unwrap_or_else(|| Value::Object(Default::default()));
104        let operation = op_key.ok_or_else(|| crate::error::FlowError::Internal {
105            message: format!("node '{node_id_str}' missing operation key"),
106            location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id_str}")),
107        })?;
108        let is_builtin = matches!(operation.as_str(), "questions" | "template");
109        let is_legacy = doc.schema_version.unwrap_or(1) < 2;
110        let (component_id, op_field) = if is_builtin || is_legacy {
111            (operation.clone(), None)
112        } else {
113            ("component.exec".to_string(), Some(operation.clone()))
114        };
115        let node = Node {
116            id: node_id.clone(),
117            component: FlowComponentRef {
118                id: ComponentId::new(&component_id).unwrap(),
119                pack_alias: None,
120                operation: op_field,
121            },
122            input: InputMapping {
123                mapping: payload.unwrap_or_else(|| Value::Object(Default::default())),
124            },
125            output: OutputMapping {
126                mapping: output_mapping,
127            },
128            routing,
129            telemetry,
130        };
131        nodes.insert(node_id, node);
132    }
133
134    let flow_id =
135        FlowId::new(doc.id.as_str()).map_err(|e| crate::error::FlowError::InvalidIdentifier {
136            kind: "flow",
137            value: doc.id.clone(),
138            detail: e.to_string(),
139            location: crate::error::FlowErrorLocation::at_path("id"),
140        })?;
141
142    let entrypoints_map: BTreeMap<String, Value> = entrypoints.into_iter().collect();
143
144    Ok(Flow {
145        schema_version: "flow-v1".to_string(),
146        id: flow_id,
147        kind,
148        entrypoints: entrypoints_map,
149        nodes,
150        metadata: FlowMetadata {
151            title: doc.title,
152            description: doc.description,
153            tags: doc.tags.into_iter().collect::<BTreeSet<_>>(),
154            extra: doc.parameters,
155        },
156    })
157}
158
159/// Compile YGTC YAML text into [`Flow`].
160pub fn compile_ygtc_str(src: &str) -> Result<Flow> {
161    let doc = loader::load_ygtc_from_str(src)?;
162    compile_flow(doc)
163}
164
165/// Compile a YGTC file into [`Flow`].
166pub fn compile_ygtc_file(path: &Path) -> Result<Flow> {
167    let doc = loader::load_ygtc_from_path(path)?;
168    compile_flow(doc)
169}
170
171fn compile_routing(
172    raw: &Value,
173    nodes: &IndexMap<String, crate::model::NodeDoc>,
174    node_id: &str,
175) -> Result<Routing> {
176    #[derive(serde::Deserialize)]
177    struct RouteDoc {
178        #[serde(default)]
179        to: Option<String>,
180        #[serde(default)]
181        out: Option<bool>,
182        #[serde(default)]
183        status: Option<String>,
184        #[serde(default)]
185        reply: Option<bool>,
186    }
187
188    let routes: Vec<RouteDoc> = if raw.is_null() {
189        Vec::new()
190    } else if let Some(shorthand) = raw.as_str() {
191        match shorthand {
192            "out" => vec![RouteDoc {
193                to: None,
194                out: Some(true),
195                status: None,
196                reply: None,
197            }],
198            "reply" => vec![RouteDoc {
199                to: None,
200                out: None,
201                status: None,
202                reply: Some(true),
203            }],
204            other => {
205                return Err(crate::error::FlowError::Routing {
206                    node_id: node_id.to_string(),
207                    message: format!("invalid routing shorthand '{other}'"),
208                    location: crate::error::FlowErrorLocation::at_path(format!(
209                        "nodes.{node_id}.routing"
210                    )),
211                });
212            }
213        }
214    } else {
215        serde_json::from_value(raw.clone()).map_err(|e| crate::error::FlowError::Routing {
216            node_id: node_id.to_string(),
217            message: e.to_string(),
218            location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id}.routing")),
219        })?
220    };
221
222    if routes.len() == 1 {
223        let route = &routes[0];
224        let is_out = route.out.unwrap_or(false);
225        if route.reply.unwrap_or(false) {
226            return Ok(Routing::Reply);
227        }
228        if let Some(to) = &route.to {
229            if to == "out" || is_out {
230                return Ok(Routing::End);
231            }
232            if !nodes.contains_key(to) {
233                return Err(crate::error::FlowError::MissingNode {
234                    target: to.clone(),
235                    node_id: node_id.to_string(),
236                    location: crate::error::FlowErrorLocation::at_path(format!(
237                        "nodes.{node_id}.routing"
238                    )),
239                });
240            }
241            return Ok(Routing::Next {
242                node_id: NodeId::new(to.as_str()).map_err(|e| {
243                    crate::error::FlowError::InvalidIdentifier {
244                        kind: "node",
245                        value: to.clone(),
246                        detail: e.to_string(),
247                        location: crate::error::FlowErrorLocation::at_path(format!(
248                            "nodes.{node_id}.routing"
249                        )),
250                    }
251                })?,
252            });
253        }
254        if is_out {
255            return Ok(Routing::End);
256        }
257        if route.status.is_some() {
258            // single status route without a destination is ambiguous
259            return Ok(Routing::Custom(raw.clone()));
260        }
261    }
262
263    if routes.is_empty() {
264        return Ok(Routing::End);
265    }
266
267    // Attempt to build a Branch when multiple status routes are present.
268    if routes.len() >= 2 {
269        use std::collections::BTreeMap;
270        let mut on_status: BTreeMap<String, NodeId> = BTreeMap::new();
271        let mut default: Option<NodeId> = None;
272        let mut any_status = false;
273        for route in &routes {
274            if route.reply.unwrap_or(false) || route.out.unwrap_or(false) {
275                return Ok(Routing::Custom(raw.clone()));
276            }
277            let to = match &route.to {
278                Some(t) => t,
279                None => return Ok(Routing::Custom(raw.clone())),
280            };
281            if !nodes.contains_key(to) {
282                return Err(crate::error::FlowError::MissingNode {
283                    target: to.clone(),
284                    node_id: node_id.to_string(),
285                    location: crate::error::FlowErrorLocation::at_path(format!(
286                        "nodes.{node_id}.routing"
287                    )),
288                });
289            }
290            let to_id = NodeId::new(to.as_str()).map_err(|e| {
291                crate::error::FlowError::InvalidIdentifier {
292                    kind: "node",
293                    value: to.clone(),
294                    detail: e.to_string(),
295                    location: crate::error::FlowErrorLocation::at_path(format!(
296                        "nodes.{node_id}.routing"
297                    )),
298                }
299            })?;
300            if let Some(status) = &route.status {
301                any_status = true;
302                on_status.insert(status.clone(), to_id);
303            } else {
304                default = Some(to_id);
305            }
306        }
307        if any_status {
308            return Ok(Routing::Branch { on_status, default });
309        }
310        if let Some(default) = default {
311            return Ok(Routing::Branch {
312                on_status,
313                default: Some(default),
314            });
315        }
316    }
317
318    Ok(Routing::Custom(raw.clone()))
319}
320
321fn resolve_entry(doc: &FlowDoc) -> Option<String> {
322    if let Some(start) = &doc.start {
323        return Some(start.clone());
324    }
325    if doc.nodes.contains_key("in") {
326        return Some("in".to_string());
327    }
328    doc.nodes.keys().next().cloned()
329}