greentic_flow/
lib.rs

1//! Downstream runtimes must set the current tenant telemetry context via
2//! `greentic_types::telemetry::set_current_tenant_ctx` before executing flows
3//! (for example, prior to `FlowEngine::run` in the host runner).
4#![forbid(unsafe_code)]
5#![allow(clippy::result_large_err)]
6
7pub mod add_step;
8pub mod component_catalog;
9pub mod config_flow;
10pub mod error;
11pub mod flow_bundle;
12pub mod flow_ir;
13pub mod ir;
14pub mod json_output;
15pub mod lint;
16pub mod loader;
17pub mod model;
18pub mod path_safety;
19pub mod registry;
20pub mod resolve;
21pub mod splice;
22pub mod util;
23
24pub use flow_bundle::{
25    ComponentPin, FlowBundle, NodeRef, blake3_hex, canonicalize_json, extract_component_pins,
26    load_and_validate_bundle, load_and_validate_bundle_with_flow,
27};
28pub use json_output::{JsonDiagnostic, LintJsonOutput, lint_to_stdout_json};
29pub use splice::{NEXT_NODE_PLACEHOLDER, splice_node_after};
30
31use crate::{error::Result, model::FlowDoc};
32use greentic_types::{
33    ComponentId, Flow, FlowComponentRef, FlowId, FlowKind, FlowMetadata, InputMapping, Node,
34    NodeId, OutputMapping, Routing, TelemetryHints, flow::FlowHasher,
35};
36use indexmap::IndexMap;
37use serde_json::Value;
38use std::collections::{BTreeMap, BTreeSet};
39use std::path::Path;
40
41/// Map a YAML flow type string to [`FlowKind`].
42pub fn map_flow_type(flow_type: &str) -> Result<FlowKind> {
43    match flow_type {
44        "messaging" => Ok(FlowKind::Messaging),
45        "event" | "events" => Ok(FlowKind::Event),
46        "component-config" => Ok(FlowKind::ComponentConfig),
47        "job" => Ok(FlowKind::Job),
48        "http" => Ok(FlowKind::Http),
49        other => Err(crate::error::FlowError::UnknownFlowType {
50            flow_type: other.to_string(),
51            location: crate::error::FlowErrorLocation::at_path("type"),
52        }),
53    }
54}
55
56/// Compile a validated [`FlowDoc`] into the canonical [`Flow`] model.
57pub fn compile_flow(doc: FlowDoc) -> Result<Flow> {
58    let kind = map_flow_type(&doc.flow_type)?;
59    let mut entrypoints = doc.entrypoints.clone();
60    if let Some(entry) = resolve_entry(&doc) {
61        entrypoints
62            .entry("default".to_string())
63            .or_insert_with(|| Value::String(entry));
64    }
65
66    let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
67    for (node_id_str, node_doc) in doc.nodes.iter() {
68        let node_id = NodeId::new(node_id_str.as_str()).map_err(|e| {
69            crate::error::FlowError::InvalidIdentifier {
70                kind: "node",
71                value: node_id_str.clone(),
72                detail: e.to_string(),
73                location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id_str}")),
74            }
75        })?;
76        let routing = compile_routing(&node_doc.routing, &doc.nodes, node_id_str)?;
77        let telemetry = node_doc
78            .telemetry
79            .as_ref()
80            .map(|t| TelemetryHints {
81                span_name: t.span_name.clone(),
82                attributes: t.attributes.clone(),
83                sampling: t.sampling.clone(),
84            })
85            .unwrap_or_default();
86        // V2: single op key in raw.
87        let mut op_key: Option<String> = None;
88        let mut payload: Option<Value> = None;
89        for (k, v) in &node_doc.raw {
90            op_key = Some(k.clone());
91            payload = Some(v.clone());
92        }
93        let output_mapping = node_doc
94            .raw
95            .get("output")
96            .cloned()
97            .unwrap_or_else(|| Value::Object(Default::default()));
98        let operation = op_key.ok_or_else(|| crate::error::FlowError::Internal {
99            message: format!("node '{node_id_str}' missing operation key"),
100            location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id_str}")),
101        })?;
102        let is_builtin = matches!(operation.as_str(), "questions" | "template");
103        let is_legacy = doc.schema_version.unwrap_or(1) < 2 || operation.contains('.');
104        let (component_id, op_field) = if is_builtin || is_legacy {
105            (operation.clone(), None)
106        } else {
107            ("component.exec".to_string(), Some(operation.clone()))
108        };
109        let node = Node {
110            id: node_id.clone(),
111            component: FlowComponentRef {
112                id: ComponentId::new(&component_id).unwrap(),
113                pack_alias: None,
114                operation: op_field,
115            },
116            input: InputMapping {
117                mapping: payload.unwrap_or_else(|| Value::Object(Default::default())),
118            },
119            output: OutputMapping {
120                mapping: output_mapping,
121            },
122            routing,
123            telemetry,
124        };
125        nodes.insert(node_id, node);
126    }
127
128    let flow_id =
129        FlowId::new(doc.id.as_str()).map_err(|e| crate::error::FlowError::InvalidIdentifier {
130            kind: "flow",
131            value: doc.id.clone(),
132            detail: e.to_string(),
133            location: crate::error::FlowErrorLocation::at_path("id"),
134        })?;
135
136    let entrypoints_map: BTreeMap<String, Value> = entrypoints.into_iter().collect();
137
138    Ok(Flow {
139        schema_version: "flow-v1".to_string(),
140        id: flow_id,
141        kind,
142        entrypoints: entrypoints_map,
143        nodes,
144        metadata: FlowMetadata {
145            title: doc.title,
146            description: doc.description,
147            tags: doc.tags.into_iter().collect::<BTreeSet<_>>(),
148            extra: doc.parameters,
149        },
150    })
151}
152
153/// Compile YGTC YAML text into [`Flow`].
154pub fn compile_ygtc_str(src: &str) -> Result<Flow> {
155    let doc = loader::load_ygtc_from_str(src)?;
156    compile_flow(doc)
157}
158
159/// Compile a YGTC file into [`Flow`].
160pub fn compile_ygtc_file(path: &Path) -> Result<Flow> {
161    let doc = loader::load_ygtc_from_path(path)?;
162    compile_flow(doc)
163}
164
165fn compile_routing(
166    raw: &Value,
167    nodes: &IndexMap<String, crate::model::NodeDoc>,
168    node_id: &str,
169) -> Result<Routing> {
170    #[derive(serde::Deserialize)]
171    struct RouteDoc {
172        #[serde(default)]
173        to: Option<String>,
174        #[serde(default)]
175        out: Option<bool>,
176        #[serde(default)]
177        status: Option<String>,
178        #[serde(default)]
179        reply: Option<bool>,
180    }
181
182    let routes: Vec<RouteDoc> = if raw.is_null() {
183        Vec::new()
184    } else if let Some(shorthand) = raw.as_str() {
185        match shorthand {
186            "out" => vec![RouteDoc {
187                to: None,
188                out: Some(true),
189                status: None,
190                reply: None,
191            }],
192            "reply" => vec![RouteDoc {
193                to: None,
194                out: None,
195                status: None,
196                reply: Some(true),
197            }],
198            other => {
199                return Err(crate::error::FlowError::Routing {
200                    node_id: node_id.to_string(),
201                    message: format!("invalid routing shorthand '{other}'"),
202                    location: crate::error::FlowErrorLocation::at_path(format!(
203                        "nodes.{node_id}.routing"
204                    )),
205                });
206            }
207        }
208    } else {
209        serde_json::from_value(raw.clone()).map_err(|e| crate::error::FlowError::Routing {
210            node_id: node_id.to_string(),
211            message: e.to_string(),
212            location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id}.routing")),
213        })?
214    };
215
216    if routes.len() == 1 {
217        let route = &routes[0];
218        let is_out = route.out.unwrap_or(false);
219        if route.reply.unwrap_or(false) {
220            return Ok(Routing::Reply);
221        }
222        if let Some(to) = &route.to {
223            if to == "out" || is_out {
224                return Ok(Routing::End);
225            }
226            if !nodes.contains_key(to) {
227                return Err(crate::error::FlowError::MissingNode {
228                    target: to.clone(),
229                    node_id: node_id.to_string(),
230                    location: crate::error::FlowErrorLocation::at_path(format!(
231                        "nodes.{node_id}.routing"
232                    )),
233                });
234            }
235            return Ok(Routing::Next {
236                node_id: NodeId::new(to.as_str()).map_err(|e| {
237                    crate::error::FlowError::InvalidIdentifier {
238                        kind: "node",
239                        value: to.clone(),
240                        detail: e.to_string(),
241                        location: crate::error::FlowErrorLocation::at_path(format!(
242                            "nodes.{node_id}.routing"
243                        )),
244                    }
245                })?,
246            });
247        }
248        if is_out {
249            return Ok(Routing::End);
250        }
251        if route.status.is_some() {
252            // single status route without a destination is ambiguous
253            return Ok(Routing::Custom(raw.clone()));
254        }
255    }
256
257    if routes.is_empty() {
258        return Ok(Routing::End);
259    }
260
261    // Attempt to build a Branch when multiple status routes are present.
262    if routes.len() >= 2 {
263        use std::collections::BTreeMap;
264        let mut on_status: BTreeMap<String, NodeId> = BTreeMap::new();
265        let mut default: Option<NodeId> = None;
266        let mut any_status = false;
267        for route in &routes {
268            if route.reply.unwrap_or(false) || route.out.unwrap_or(false) {
269                return Ok(Routing::Custom(raw.clone()));
270            }
271            let to = match &route.to {
272                Some(t) => t,
273                None => return Ok(Routing::Custom(raw.clone())),
274            };
275            if !nodes.contains_key(to) {
276                return Err(crate::error::FlowError::MissingNode {
277                    target: to.clone(),
278                    node_id: node_id.to_string(),
279                    location: crate::error::FlowErrorLocation::at_path(format!(
280                        "nodes.{node_id}.routing"
281                    )),
282                });
283            }
284            let to_id = NodeId::new(to.as_str()).map_err(|e| {
285                crate::error::FlowError::InvalidIdentifier {
286                    kind: "node",
287                    value: to.clone(),
288                    detail: e.to_string(),
289                    location: crate::error::FlowErrorLocation::at_path(format!(
290                        "nodes.{node_id}.routing"
291                    )),
292                }
293            })?;
294            if let Some(status) = &route.status {
295                any_status = true;
296                on_status.insert(status.clone(), to_id);
297            } else {
298                default = Some(to_id);
299            }
300        }
301        if any_status {
302            return Ok(Routing::Branch { on_status, default });
303        }
304        if let Some(default) = default {
305            return Ok(Routing::Branch {
306                on_status,
307                default: Some(default),
308            });
309        }
310    }
311
312    Ok(Routing::Custom(raw.clone()))
313}
314
315fn resolve_entry(doc: &FlowDoc) -> Option<String> {
316    if let Some(start) = &doc.start {
317        return Some(start.clone());
318    }
319    if doc.nodes.contains_key("in") {
320        return Some("in".to_string());
321    }
322    doc.nodes.keys().next().cloned()
323}