greentic_flow/
lib.rs

1//! Downstream runtimes must set the current tenant telemetry context via
2//! `greentic_types::telemetry::set_current_tenant_ctx` before executing flows
3//! (for example, prior to `FlowEngine::run` in the host runner).
4#![forbid(unsafe_code)]
5#![allow(clippy::result_large_err)]
6
7pub mod add_step;
8pub mod component_catalog;
9pub mod config_flow;
10pub mod error;
11pub mod flow_bundle;
12pub mod flow_ir;
13pub mod ir;
14pub mod json_output;
15pub mod lint;
16pub mod loader;
17pub mod model;
18pub mod path_safety;
19pub mod registry;
20pub mod resolve;
21pub mod splice;
22pub mod util;
23
24pub use flow_bundle::{
25    ComponentPin, FlowBundle, NodeRef, blake3_hex, canonicalize_json, extract_component_pins,
26    load_and_validate_bundle, load_and_validate_bundle_with_flow,
27};
28pub use json_output::{JsonDiagnostic, LintJsonOutput, lint_to_stdout_json};
29pub use splice::{NEXT_NODE_PLACEHOLDER, splice_node_after};
30
31use crate::{error::Result, model::FlowDoc};
32use greentic_types::{
33    ComponentId, Flow, FlowComponentRef, FlowId, FlowKind, FlowMetadata, InputMapping, Node,
34    NodeId, OutputMapping, Routing, TelemetryHints, flow::FlowHasher,
35};
36use indexmap::IndexMap;
37use serde_json::Value;
38use std::collections::{BTreeMap, BTreeSet};
39use std::path::Path;
40
41/// Map a YAML flow type string to [`FlowKind`].
42pub fn map_flow_type(flow_type: &str) -> Result<FlowKind> {
43    match flow_type {
44        "messaging" => Ok(FlowKind::Messaging),
45        "event" | "events" => Ok(FlowKind::Event),
46        "component-config" => Ok(FlowKind::ComponentConfig),
47        "job" => Ok(FlowKind::Job),
48        "http" => Ok(FlowKind::Http),
49        other => Err(crate::error::FlowError::UnknownFlowType {
50            flow_type: other.to_string(),
51            location: crate::error::FlowErrorLocation::at_path("type"),
52        }),
53    }
54}
55
56/// Compile a validated [`FlowDoc`] into the canonical [`Flow`] model.
57pub fn compile_flow(doc: FlowDoc) -> Result<Flow> {
58    let kind = map_flow_type(&doc.flow_type)?;
59    let mut entrypoints = doc.entrypoints.clone();
60    if let Some(entry) = resolve_entry(&doc) {
61        entrypoints
62            .entry("default".to_string())
63            .or_insert_with(|| Value::String(entry));
64    }
65
66    let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
67    for (node_id_str, node_doc) in doc.nodes.iter() {
68        let node_id = NodeId::new(node_id_str.as_str()).map_err(|e| {
69            crate::error::FlowError::InvalidIdentifier {
70                kind: "node",
71                value: node_id_str.clone(),
72                detail: e.to_string(),
73                location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id_str}")),
74            }
75        })?;
76        let component_id = ComponentId::new(node_doc.component.as_str()).map_err(|e| {
77            crate::error::FlowError::InvalidIdentifier {
78                kind: "component",
79                value: node_doc.component.clone(),
80                detail: e.to_string(),
81                location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id_str}")),
82            }
83        })?;
84        let routing = compile_routing(&node_doc.routing, &doc.nodes, node_id_str)?;
85        let telemetry = node_doc
86            .telemetry
87            .as_ref()
88            .map(|t| TelemetryHints {
89                span_name: t.span_name.clone(),
90                attributes: t.attributes.clone(),
91                sampling: t.sampling.clone(),
92            })
93            .unwrap_or_default();
94        let node = Node {
95            id: node_id.clone(),
96            component: FlowComponentRef {
97                id: component_id,
98                pack_alias: node_doc.pack_alias.clone(),
99                operation: node_doc.operation.clone(),
100            },
101            input: InputMapping {
102                mapping: node_doc.payload.clone(),
103            },
104            output: OutputMapping {
105                mapping: node_doc
106                    .output
107                    .clone()
108                    .unwrap_or_else(|| Value::Object(Default::default())),
109            },
110            routing,
111            telemetry,
112        };
113        nodes.insert(node_id, node);
114    }
115
116    let flow_id =
117        FlowId::new(doc.id.as_str()).map_err(|e| crate::error::FlowError::InvalidIdentifier {
118            kind: "flow",
119            value: doc.id.clone(),
120            detail: e.to_string(),
121            location: crate::error::FlowErrorLocation::at_path("id"),
122        })?;
123
124    Ok(Flow {
125        schema_version: "flow-v1".to_string(),
126        id: flow_id,
127        kind,
128        entrypoints,
129        nodes,
130        metadata: FlowMetadata {
131            title: doc.title,
132            description: doc.description,
133            tags: doc.tags.into_iter().collect::<BTreeSet<_>>(),
134            extra: doc.parameters,
135        },
136    })
137}
138
139/// Compile YGTC YAML text into [`Flow`].
140pub fn compile_ygtc_str(src: &str) -> Result<Flow> {
141    let doc = loader::load_ygtc_from_str(src)?;
142    compile_flow(doc)
143}
144
145/// Compile a YGTC file into [`Flow`].
146pub fn compile_ygtc_file(path: &Path) -> Result<Flow> {
147    let doc = loader::load_ygtc_from_path(path)?;
148    compile_flow(doc)
149}
150
151fn compile_routing(
152    raw: &Value,
153    nodes: &BTreeMap<String, crate::model::NodeDoc>,
154    node_id: &str,
155) -> Result<Routing> {
156    #[derive(serde::Deserialize)]
157    struct RouteDoc {
158        #[serde(default)]
159        to: Option<String>,
160        #[serde(default)]
161        out: Option<bool>,
162        #[serde(default)]
163        status: Option<String>,
164        #[serde(default)]
165        reply: Option<bool>,
166    }
167
168    let routes: Vec<RouteDoc> = if raw.is_null() {
169        Vec::new()
170    } else {
171        serde_json::from_value(raw.clone()).map_err(|e| crate::error::FlowError::Routing {
172            node_id: node_id.to_string(),
173            message: e.to_string(),
174            location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id}.routing")),
175        })?
176    };
177
178    if routes.len() == 1 {
179        let route = &routes[0];
180        let is_out = route.out.unwrap_or(false);
181        if route.reply.unwrap_or(false) {
182            return Ok(Routing::Reply);
183        }
184        if let Some(to) = &route.to {
185            if to == "out" || is_out {
186                return Ok(Routing::End);
187            }
188            if !nodes.contains_key(to) {
189                return Err(crate::error::FlowError::MissingNode {
190                    target: to.clone(),
191                    node_id: node_id.to_string(),
192                    location: crate::error::FlowErrorLocation::at_path(format!(
193                        "nodes.{node_id}.routing"
194                    )),
195                });
196            }
197            return Ok(Routing::Next {
198                node_id: NodeId::new(to.as_str()).map_err(|e| {
199                    crate::error::FlowError::InvalidIdentifier {
200                        kind: "node",
201                        value: to.clone(),
202                        detail: e.to_string(),
203                        location: crate::error::FlowErrorLocation::at_path(format!(
204                            "nodes.{node_id}.routing"
205                        )),
206                    }
207                })?,
208            });
209        }
210        if is_out {
211            return Ok(Routing::End);
212        }
213        if route.status.is_some() {
214            // single status route without a destination is ambiguous
215            return Ok(Routing::Custom(raw.clone()));
216        }
217    }
218
219    if routes.is_empty() {
220        return Ok(Routing::End);
221    }
222
223    // Attempt to build a Branch when multiple status routes are present.
224    if routes.len() >= 2 {
225        use std::collections::BTreeMap;
226        let mut on_status: BTreeMap<String, NodeId> = BTreeMap::new();
227        let mut default: Option<NodeId> = None;
228        let mut any_status = false;
229        for route in &routes {
230            if route.reply.unwrap_or(false) || route.out.unwrap_or(false) {
231                return Ok(Routing::Custom(raw.clone()));
232            }
233            let to = match &route.to {
234                Some(t) => t,
235                None => return Ok(Routing::Custom(raw.clone())),
236            };
237            if !nodes.contains_key(to) {
238                return Err(crate::error::FlowError::MissingNode {
239                    target: to.clone(),
240                    node_id: node_id.to_string(),
241                    location: crate::error::FlowErrorLocation::at_path(format!(
242                        "nodes.{node_id}.routing"
243                    )),
244                });
245            }
246            let to_id = NodeId::new(to.as_str()).map_err(|e| {
247                crate::error::FlowError::InvalidIdentifier {
248                    kind: "node",
249                    value: to.clone(),
250                    detail: e.to_string(),
251                    location: crate::error::FlowErrorLocation::at_path(format!(
252                        "nodes.{node_id}.routing"
253                    )),
254                }
255            })?;
256            if let Some(status) = &route.status {
257                any_status = true;
258                on_status.insert(status.clone(), to_id);
259            } else {
260                default = Some(to_id);
261            }
262        }
263        if any_status {
264            return Ok(Routing::Branch { on_status, default });
265        }
266        if let Some(default) = default {
267            return Ok(Routing::Branch {
268                on_status,
269                default: Some(default),
270            });
271        }
272    }
273
274    Ok(Routing::Custom(raw.clone()))
275}
276
277fn resolve_entry(doc: &FlowDoc) -> Option<String> {
278    if let Some(start) = &doc.start {
279        return Some(start.clone());
280    }
281    if doc.nodes.contains_key("in") {
282        return Some("in".to_string());
283    }
284    doc.nodes.keys().next().cloned()
285}