greentic_flow/
lib.rs

1//! Downstream runtimes must set the current tenant telemetry context via
2//! `greentic_types::telemetry::set_current_tenant_ctx` before executing flows
3//! (for example, prior to `FlowEngine::run` in the host runner).
4#![forbid(unsafe_code)]
5#![allow(clippy::result_large_err)]
6
7pub mod config_flow;
8pub mod error;
9pub mod flow_bundle;
10pub mod ir;
11pub mod json_output;
12pub mod lint;
13pub mod loader;
14pub mod model;
15pub mod path_safety;
16pub mod registry;
17pub mod resolve;
18pub mod splice;
19pub mod util;
20
21pub use flow_bundle::{
22    ComponentPin, FlowBundle, NodeRef, blake3_hex, canonicalize_json, extract_component_pins,
23    load_and_validate_bundle, load_and_validate_bundle_with_flow,
24};
25pub use json_output::{JsonDiagnostic, LintJsonOutput, lint_to_stdout_json};
26pub use splice::{NEXT_NODE_PLACEHOLDER, splice_node_after};
27
28use crate::{error::Result, model::FlowDoc};
29use greentic_types::{
30    ComponentId, Flow, FlowComponentRef, FlowId, FlowKind, FlowMetadata, InputMapping, Node,
31    NodeId, OutputMapping, Routing, TelemetryHints, flow::FlowHasher,
32};
33use indexmap::IndexMap;
34use serde_json::Value;
35use std::collections::{BTreeMap, BTreeSet};
36use std::path::Path;
37
38/// Map a YAML flow type string to [`FlowKind`].
39pub fn map_flow_type(flow_type: &str) -> Result<FlowKind> {
40    match flow_type {
41        "messaging" => Ok(FlowKind::Messaging),
42        "event" | "events" => Ok(FlowKind::Event),
43        "component-config" => Ok(FlowKind::ComponentConfig),
44        "job" => Ok(FlowKind::Job),
45        "http" => Ok(FlowKind::Http),
46        other => Err(crate::error::FlowError::UnknownFlowType {
47            flow_type: other.to_string(),
48            location: crate::error::FlowErrorLocation::at_path("type"),
49        }),
50    }
51}
52
53/// Compile a validated [`FlowDoc`] into the canonical [`Flow`] model.
54pub fn compile_flow(doc: FlowDoc) -> Result<Flow> {
55    let kind = map_flow_type(&doc.flow_type)?;
56    let mut entrypoints = doc.entrypoints.clone();
57    if let Some(entry) = resolve_entry(&doc) {
58        entrypoints
59            .entry("default".to_string())
60            .or_insert_with(|| Value::String(entry));
61    }
62
63    let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
64    for (node_id_str, node_doc) in doc.nodes.iter() {
65        let node_id = NodeId::new(node_id_str.as_str()).map_err(|e| {
66            crate::error::FlowError::InvalidIdentifier {
67                kind: "node",
68                value: node_id_str.clone(),
69                detail: e.to_string(),
70                location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id_str}")),
71            }
72        })?;
73        let component_id = ComponentId::new(node_doc.component.as_str()).map_err(|e| {
74            crate::error::FlowError::InvalidIdentifier {
75                kind: "component",
76                value: node_doc.component.clone(),
77                detail: e.to_string(),
78                location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id_str}")),
79            }
80        })?;
81        let routing = compile_routing(&node_doc.routing, &doc.nodes, node_id_str)?;
82        let telemetry = node_doc
83            .telemetry
84            .as_ref()
85            .map(|t| TelemetryHints {
86                span_name: t.span_name.clone(),
87                attributes: t.attributes.clone(),
88                sampling: t.sampling.clone(),
89            })
90            .unwrap_or_default();
91        let node = Node {
92            id: node_id.clone(),
93            component: FlowComponentRef {
94                id: component_id,
95                pack_alias: node_doc.pack_alias.clone(),
96                operation: node_doc.operation.clone(),
97            },
98            input: InputMapping {
99                mapping: node_doc.payload.clone(),
100            },
101            output: OutputMapping {
102                mapping: node_doc
103                    .output
104                    .clone()
105                    .unwrap_or_else(|| Value::Object(Default::default())),
106            },
107            routing,
108            telemetry,
109        };
110        nodes.insert(node_id, node);
111    }
112
113    let flow_id =
114        FlowId::new(doc.id.as_str()).map_err(|e| crate::error::FlowError::InvalidIdentifier {
115            kind: "flow",
116            value: doc.id.clone(),
117            detail: e.to_string(),
118            location: crate::error::FlowErrorLocation::at_path("id"),
119        })?;
120
121    Ok(Flow {
122        schema_version: "flow-v1".to_string(),
123        id: flow_id,
124        kind,
125        entrypoints,
126        nodes,
127        metadata: FlowMetadata {
128            title: doc.title,
129            description: doc.description,
130            tags: doc.tags.into_iter().collect::<BTreeSet<_>>(),
131            extra: doc.parameters,
132        },
133    })
134}
135
136/// Compile YGTC YAML text into [`Flow`].
137pub fn compile_ygtc_str(src: &str) -> Result<Flow> {
138    let doc = loader::load_ygtc_from_str(src)?;
139    compile_flow(doc)
140}
141
142/// Compile a YGTC file into [`Flow`].
143pub fn compile_ygtc_file(path: &Path) -> Result<Flow> {
144    let doc = loader::load_ygtc_from_path(path)?;
145    compile_flow(doc)
146}
147
148fn compile_routing(
149    raw: &Value,
150    nodes: &BTreeMap<String, crate::model::NodeDoc>,
151    node_id: &str,
152) -> Result<Routing> {
153    #[derive(serde::Deserialize)]
154    struct RouteDoc {
155        #[serde(default)]
156        to: Option<String>,
157        #[serde(default)]
158        out: Option<bool>,
159        #[serde(default)]
160        status: Option<String>,
161        #[serde(default)]
162        reply: Option<bool>,
163    }
164
165    let routes: Vec<RouteDoc> = if raw.is_null() {
166        Vec::new()
167    } else {
168        serde_json::from_value(raw.clone()).map_err(|e| crate::error::FlowError::Routing {
169            node_id: node_id.to_string(),
170            message: e.to_string(),
171            location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id}.routing")),
172        })?
173    };
174
175    if routes.len() == 1 {
176        let route = &routes[0];
177        let is_out = route.out.unwrap_or(false);
178        if route.reply.unwrap_or(false) {
179            return Ok(Routing::Reply);
180        }
181        if let Some(to) = &route.to {
182            if to == "out" || is_out {
183                return Ok(Routing::End);
184            }
185            if !nodes.contains_key(to) {
186                return Err(crate::error::FlowError::MissingNode {
187                    target: to.clone(),
188                    node_id: node_id.to_string(),
189                    location: crate::error::FlowErrorLocation::at_path(format!(
190                        "nodes.{node_id}.routing"
191                    )),
192                });
193            }
194            return Ok(Routing::Next {
195                node_id: NodeId::new(to.as_str()).map_err(|e| {
196                    crate::error::FlowError::InvalidIdentifier {
197                        kind: "node",
198                        value: to.clone(),
199                        detail: e.to_string(),
200                        location: crate::error::FlowErrorLocation::at_path(format!(
201                            "nodes.{node_id}.routing"
202                        )),
203                    }
204                })?,
205            });
206        }
207        if is_out {
208            return Ok(Routing::End);
209        }
210        if route.status.is_some() {
211            // single status route without a destination is ambiguous
212            return Ok(Routing::Custom(raw.clone()));
213        }
214    }
215
216    if routes.is_empty() {
217        return Ok(Routing::End);
218    }
219
220    // Attempt to build a Branch when multiple status routes are present.
221    if routes.len() >= 2 {
222        use std::collections::BTreeMap;
223        let mut on_status: BTreeMap<String, NodeId> = BTreeMap::new();
224        let mut default: Option<NodeId> = None;
225        let mut any_status = false;
226        for route in &routes {
227            if route.reply.unwrap_or(false) || route.out.unwrap_or(false) {
228                return Ok(Routing::Custom(raw.clone()));
229            }
230            let to = match &route.to {
231                Some(t) => t,
232                None => return Ok(Routing::Custom(raw.clone())),
233            };
234            if !nodes.contains_key(to) {
235                return Err(crate::error::FlowError::MissingNode {
236                    target: to.clone(),
237                    node_id: node_id.to_string(),
238                    location: crate::error::FlowErrorLocation::at_path(format!(
239                        "nodes.{node_id}.routing"
240                    )),
241                });
242            }
243            let to_id = NodeId::new(to.as_str()).map_err(|e| {
244                crate::error::FlowError::InvalidIdentifier {
245                    kind: "node",
246                    value: to.clone(),
247                    detail: e.to_string(),
248                    location: crate::error::FlowErrorLocation::at_path(format!(
249                        "nodes.{node_id}.routing"
250                    )),
251                }
252            })?;
253            if let Some(status) = &route.status {
254                any_status = true;
255                on_status.insert(status.clone(), to_id);
256            } else {
257                default = Some(to_id);
258            }
259        }
260        if any_status {
261            return Ok(Routing::Branch { on_status, default });
262        }
263        if let Some(default) = default {
264            return Ok(Routing::Branch {
265                on_status,
266                default: Some(default),
267            });
268        }
269    }
270
271    Ok(Routing::Custom(raw.clone()))
272}
273
274fn resolve_entry(doc: &FlowDoc) -> Option<String> {
275    if let Some(start) = &doc.start {
276        return Some(start.clone());
277    }
278    if doc.nodes.contains_key("in") {
279        return Some("in".to_string());
280    }
281    doc.nodes.keys().next().cloned()
282}