greentic_flow/
lib.rs

1//! Downstream runtimes must set the current tenant telemetry context via
2//! `greentic_types::telemetry::set_current_tenant_ctx` before executing flows
3//! (for example, prior to `FlowEngine::run` in the host runner).
4#![forbid(unsafe_code)]
5#![allow(clippy::result_large_err)]
6
7pub mod config_flow;
8pub mod error;
9pub mod flow_bundle;
10pub mod ir;
11pub mod json_output;
12pub mod lint;
13pub mod loader;
14pub mod model;
15pub mod path_safety;
16pub mod registry;
17pub mod resolve;
18pub mod util;
19
20pub use flow_bundle::{
21    ComponentPin, FlowBundle, NodeRef, blake3_hex, canonicalize_json, extract_component_pins,
22    load_and_validate_bundle, load_and_validate_bundle_with_flow,
23};
24pub use json_output::{JsonDiagnostic, LintJsonOutput, lint_to_stdout_json};
25
26use crate::{error::Result, model::FlowDoc};
27use greentic_types::{
28    ComponentId, Flow, FlowComponentRef, FlowId, FlowKind, FlowMetadata, InputMapping, Node,
29    NodeId, OutputMapping, Routing, TelemetryHints, flow::FlowHasher,
30};
31use indexmap::IndexMap;
32use serde_json::Value;
33use std::collections::{BTreeMap, BTreeSet};
34use std::path::Path;
35
36/// Map a YAML flow type string to [`FlowKind`].
37pub fn map_flow_type(flow_type: &str) -> Result<FlowKind> {
38    match flow_type {
39        "messaging" => Ok(FlowKind::Messaging),
40        "event" | "events" => Ok(FlowKind::Event),
41        "component-config" => Ok(FlowKind::ComponentConfig),
42        "job" => Ok(FlowKind::Job),
43        "http" => Ok(FlowKind::Http),
44        other => Err(crate::error::FlowError::UnknownFlowType {
45            flow_type: other.to_string(),
46            location: crate::error::FlowErrorLocation::at_path("type"),
47        }),
48    }
49}
50
51/// Compile a validated [`FlowDoc`] into the canonical [`Flow`] model.
52pub fn compile_flow(doc: FlowDoc) -> Result<Flow> {
53    let kind = map_flow_type(&doc.flow_type)?;
54    let mut entrypoints = doc.entrypoints.clone();
55    if let Some(entry) = resolve_entry(&doc) {
56        entrypoints
57            .entry("default".to_string())
58            .or_insert_with(|| Value::String(entry));
59    }
60
61    let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
62    for (node_id_str, node_doc) in doc.nodes.iter() {
63        let node_id = NodeId::new(node_id_str.as_str()).map_err(|e| {
64            crate::error::FlowError::InvalidIdentifier {
65                kind: "node",
66                value: node_id_str.clone(),
67                detail: e.to_string(),
68                location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id_str}")),
69            }
70        })?;
71        let component_id = ComponentId::new(node_doc.component.as_str()).map_err(|e| {
72            crate::error::FlowError::InvalidIdentifier {
73                kind: "component",
74                value: node_doc.component.clone(),
75                detail: e.to_string(),
76                location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id_str}")),
77            }
78        })?;
79        let routing = compile_routing(&node_doc.routing, &doc.nodes, node_id_str)?;
80        let telemetry = node_doc
81            .telemetry
82            .as_ref()
83            .map(|t| TelemetryHints {
84                span_name: t.span_name.clone(),
85                attributes: t.attributes.clone(),
86                sampling: t.sampling.clone(),
87            })
88            .unwrap_or_default();
89        let node = Node {
90            id: node_id.clone(),
91            component: FlowComponentRef {
92                id: component_id,
93                pack_alias: node_doc.pack_alias.clone(),
94                operation: node_doc.operation.clone(),
95            },
96            input: InputMapping {
97                mapping: node_doc.payload.clone(),
98            },
99            output: OutputMapping {
100                mapping: node_doc
101                    .output
102                    .clone()
103                    .unwrap_or_else(|| Value::Object(Default::default())),
104            },
105            routing,
106            telemetry,
107        };
108        nodes.insert(node_id, node);
109    }
110
111    let flow_id =
112        FlowId::new(doc.id.as_str()).map_err(|e| crate::error::FlowError::InvalidIdentifier {
113            kind: "flow",
114            value: doc.id.clone(),
115            detail: e.to_string(),
116            location: crate::error::FlowErrorLocation::at_path("id"),
117        })?;
118
119    Ok(Flow {
120        schema_version: "flow-v1".to_string(),
121        id: flow_id,
122        kind,
123        entrypoints,
124        nodes,
125        metadata: FlowMetadata {
126            title: doc.title,
127            description: doc.description,
128            tags: doc.tags.into_iter().collect::<BTreeSet<_>>(),
129            extra: doc.parameters,
130        },
131    })
132}
133
134/// Compile YGTC YAML text into [`Flow`].
135pub fn compile_ygtc_str(src: &str) -> Result<Flow> {
136    let doc = loader::load_ygtc_from_str(src)?;
137    compile_flow(doc)
138}
139
140/// Compile a YGTC file into [`Flow`].
141pub fn compile_ygtc_file(path: &Path) -> Result<Flow> {
142    let doc = loader::load_ygtc_from_path(path)?;
143    compile_flow(doc)
144}
145
146fn compile_routing(
147    raw: &Value,
148    nodes: &BTreeMap<String, crate::model::NodeDoc>,
149    node_id: &str,
150) -> Result<Routing> {
151    #[derive(serde::Deserialize)]
152    struct RouteDoc {
153        #[serde(default)]
154        to: Option<String>,
155        #[serde(default)]
156        out: Option<bool>,
157        #[serde(default)]
158        status: Option<String>,
159        #[serde(default)]
160        reply: Option<bool>,
161    }
162
163    let routes: Vec<RouteDoc> = if raw.is_null() {
164        Vec::new()
165    } else {
166        serde_json::from_value(raw.clone()).map_err(|e| crate::error::FlowError::Routing {
167            node_id: node_id.to_string(),
168            message: e.to_string(),
169            location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id}.routing")),
170        })?
171    };
172
173    if routes.len() == 1 {
174        let route = &routes[0];
175        let is_out = route.out.unwrap_or(false);
176        if route.reply.unwrap_or(false) {
177            return Ok(Routing::Reply);
178        }
179        if let Some(to) = &route.to {
180            if to == "out" || is_out {
181                return Ok(Routing::End);
182            }
183            if !nodes.contains_key(to) {
184                return Err(crate::error::FlowError::MissingNode {
185                    target: to.clone(),
186                    node_id: node_id.to_string(),
187                    location: crate::error::FlowErrorLocation::at_path(format!(
188                        "nodes.{node_id}.routing"
189                    )),
190                });
191            }
192            return Ok(Routing::Next {
193                node_id: NodeId::new(to.as_str()).map_err(|e| {
194                    crate::error::FlowError::InvalidIdentifier {
195                        kind: "node",
196                        value: to.clone(),
197                        detail: e.to_string(),
198                        location: crate::error::FlowErrorLocation::at_path(format!(
199                            "nodes.{node_id}.routing"
200                        )),
201                    }
202                })?,
203            });
204        }
205        if is_out {
206            return Ok(Routing::End);
207        }
208        if route.status.is_some() {
209            // single status route without a destination is ambiguous
210            return Ok(Routing::Custom(raw.clone()));
211        }
212    }
213
214    if routes.is_empty() {
215        return Ok(Routing::End);
216    }
217
218    // Attempt to build a Branch when multiple status routes are present.
219    if routes.len() >= 2 {
220        use std::collections::BTreeMap;
221        let mut on_status: BTreeMap<String, NodeId> = BTreeMap::new();
222        let mut default: Option<NodeId> = None;
223        let mut any_status = false;
224        for route in &routes {
225            if route.reply.unwrap_or(false) || route.out.unwrap_or(false) {
226                return Ok(Routing::Custom(raw.clone()));
227            }
228            let to = match &route.to {
229                Some(t) => t,
230                None => return Ok(Routing::Custom(raw.clone())),
231            };
232            if !nodes.contains_key(to) {
233                return Err(crate::error::FlowError::MissingNode {
234                    target: to.clone(),
235                    node_id: node_id.to_string(),
236                    location: crate::error::FlowErrorLocation::at_path(format!(
237                        "nodes.{node_id}.routing"
238                    )),
239                });
240            }
241            let to_id = NodeId::new(to.as_str()).map_err(|e| {
242                crate::error::FlowError::InvalidIdentifier {
243                    kind: "node",
244                    value: to.clone(),
245                    detail: e.to_string(),
246                    location: crate::error::FlowErrorLocation::at_path(format!(
247                        "nodes.{node_id}.routing"
248                    )),
249                }
250            })?;
251            if let Some(status) = &route.status {
252                any_status = true;
253                on_status.insert(status.clone(), to_id);
254            } else {
255                default = Some(to_id);
256            }
257        }
258        if any_status {
259            return Ok(Routing::Branch { on_status, default });
260        }
261        if let Some(default) = default {
262            return Ok(Routing::Branch {
263                on_status,
264                default: Some(default),
265            });
266        }
267    }
268
269    Ok(Routing::Custom(raw.clone()))
270}
271
272fn resolve_entry(doc: &FlowDoc) -> Option<String> {
273    if let Some(start) = &doc.start {
274        return Some(start.clone());
275    }
276    if doc.nodes.contains_key("in") {
277        return Some("in".to_string());
278    }
279    doc.nodes.keys().next().cloned()
280}