1#![forbid(unsafe_code)]
5#![allow(clippy::result_large_err)]
6
7pub mod config_flow;
8pub mod error;
9pub mod flow_bundle;
10pub mod ir;
11pub mod json_output;
12pub mod lint;
13pub mod loader;
14pub mod model;
15pub mod path_safety;
16pub mod registry;
17pub mod resolve;
18pub mod util;
19
20pub use flow_bundle::{
21 ComponentPin, FlowBundle, NodeRef, blake3_hex, canonicalize_json, extract_component_pins,
22 load_and_validate_bundle, load_and_validate_bundle_with_flow,
23};
24pub use json_output::{JsonDiagnostic, LintJsonOutput, lint_to_stdout_json};
25
26use crate::{error::Result, model::FlowDoc};
27use greentic_types::{
28 ComponentId, Flow, FlowComponentRef, FlowId, FlowKind, FlowMetadata, InputMapping, Node,
29 NodeId, OutputMapping, Routing, TelemetryHints, flow::FlowHasher,
30};
31use indexmap::IndexMap;
32use serde_json::Value;
33use std::collections::{BTreeMap, BTreeSet};
34use std::path::Path;
35
36pub fn map_flow_type(flow_type: &str) -> Result<FlowKind> {
38 match flow_type {
39 "messaging" => Ok(FlowKind::Messaging),
40 "event" | "events" => Ok(FlowKind::Event),
41 "component-config" => Ok(FlowKind::ComponentConfig),
42 "job" => Ok(FlowKind::Job),
43 "http" => Ok(FlowKind::Http),
44 other => Err(crate::error::FlowError::UnknownFlowType {
45 flow_type: other.to_string(),
46 location: crate::error::FlowErrorLocation::at_path("type"),
47 }),
48 }
49}
50
51pub fn compile_flow(doc: FlowDoc) -> Result<Flow> {
53 let kind = map_flow_type(&doc.flow_type)?;
54 let mut entrypoints = doc.entrypoints.clone();
55 if let Some(entry) = resolve_entry(&doc) {
56 entrypoints
57 .entry("default".to_string())
58 .or_insert_with(|| Value::String(entry));
59 }
60
61 let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
62 for (node_id_str, node_doc) in doc.nodes.iter() {
63 let node_id = NodeId::new(node_id_str.as_str()).map_err(|e| {
64 crate::error::FlowError::InvalidIdentifier {
65 kind: "node",
66 value: node_id_str.clone(),
67 detail: e.to_string(),
68 location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id_str}")),
69 }
70 })?;
71 let component_id = ComponentId::new(node_doc.component.as_str()).map_err(|e| {
72 crate::error::FlowError::InvalidIdentifier {
73 kind: "component",
74 value: node_doc.component.clone(),
75 detail: e.to_string(),
76 location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id_str}")),
77 }
78 })?;
79 let routing = compile_routing(&node_doc.routing, &doc.nodes, node_id_str)?;
80 let telemetry = node_doc
81 .telemetry
82 .as_ref()
83 .map(|t| TelemetryHints {
84 span_name: t.span_name.clone(),
85 attributes: t.attributes.clone(),
86 sampling: t.sampling.clone(),
87 })
88 .unwrap_or_default();
89 let node = Node {
90 id: node_id.clone(),
91 component: FlowComponentRef {
92 id: component_id,
93 pack_alias: node_doc.pack_alias.clone(),
94 operation: node_doc.operation.clone(),
95 },
96 input: InputMapping {
97 mapping: node_doc.payload.clone(),
98 },
99 output: OutputMapping {
100 mapping: node_doc
101 .output
102 .clone()
103 .unwrap_or_else(|| Value::Object(Default::default())),
104 },
105 routing,
106 telemetry,
107 };
108 nodes.insert(node_id, node);
109 }
110
111 let flow_id =
112 FlowId::new(doc.id.as_str()).map_err(|e| crate::error::FlowError::InvalidIdentifier {
113 kind: "flow",
114 value: doc.id.clone(),
115 detail: e.to_string(),
116 location: crate::error::FlowErrorLocation::at_path("id"),
117 })?;
118
119 Ok(Flow {
120 schema_version: "flow-v1".to_string(),
121 id: flow_id,
122 kind,
123 entrypoints,
124 nodes,
125 metadata: FlowMetadata {
126 title: doc.title,
127 description: doc.description,
128 tags: doc.tags.into_iter().collect::<BTreeSet<_>>(),
129 extra: doc.parameters,
130 },
131 })
132}
133
134pub fn compile_ygtc_str(src: &str) -> Result<Flow> {
136 let doc = loader::load_ygtc_from_str(src)?;
137 compile_flow(doc)
138}
139
140pub fn compile_ygtc_file(path: &Path) -> Result<Flow> {
142 let doc = loader::load_ygtc_from_path(path)?;
143 compile_flow(doc)
144}
145
146fn compile_routing(
147 raw: &Value,
148 nodes: &BTreeMap<String, crate::model::NodeDoc>,
149 node_id: &str,
150) -> Result<Routing> {
151 #[derive(serde::Deserialize)]
152 struct RouteDoc {
153 #[serde(default)]
154 to: Option<String>,
155 #[serde(default)]
156 out: Option<bool>,
157 #[serde(default)]
158 status: Option<String>,
159 #[serde(default)]
160 reply: Option<bool>,
161 }
162
163 let routes: Vec<RouteDoc> = if raw.is_null() {
164 Vec::new()
165 } else {
166 serde_json::from_value(raw.clone()).map_err(|e| crate::error::FlowError::Routing {
167 node_id: node_id.to_string(),
168 message: e.to_string(),
169 location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id}.routing")),
170 })?
171 };
172
173 if routes.len() == 1 {
174 let route = &routes[0];
175 let is_out = route.out.unwrap_or(false);
176 if route.reply.unwrap_or(false) {
177 return Ok(Routing::Reply);
178 }
179 if let Some(to) = &route.to {
180 if to == "out" || is_out {
181 return Ok(Routing::End);
182 }
183 if !nodes.contains_key(to) {
184 return Err(crate::error::FlowError::MissingNode {
185 target: to.clone(),
186 node_id: node_id.to_string(),
187 location: crate::error::FlowErrorLocation::at_path(format!(
188 "nodes.{node_id}.routing"
189 )),
190 });
191 }
192 return Ok(Routing::Next {
193 node_id: NodeId::new(to.as_str()).map_err(|e| {
194 crate::error::FlowError::InvalidIdentifier {
195 kind: "node",
196 value: to.clone(),
197 detail: e.to_string(),
198 location: crate::error::FlowErrorLocation::at_path(format!(
199 "nodes.{node_id}.routing"
200 )),
201 }
202 })?,
203 });
204 }
205 if is_out {
206 return Ok(Routing::End);
207 }
208 if route.status.is_some() {
209 return Ok(Routing::Custom(raw.clone()));
211 }
212 }
213
214 if routes.is_empty() {
215 return Ok(Routing::End);
216 }
217
218 if routes.len() >= 2 {
220 use std::collections::BTreeMap;
221 let mut on_status: BTreeMap<String, NodeId> = BTreeMap::new();
222 let mut default: Option<NodeId> = None;
223 let mut any_status = false;
224 for route in &routes {
225 if route.reply.unwrap_or(false) || route.out.unwrap_or(false) {
226 return Ok(Routing::Custom(raw.clone()));
227 }
228 let to = match &route.to {
229 Some(t) => t,
230 None => return Ok(Routing::Custom(raw.clone())),
231 };
232 if !nodes.contains_key(to) {
233 return Err(crate::error::FlowError::MissingNode {
234 target: to.clone(),
235 node_id: node_id.to_string(),
236 location: crate::error::FlowErrorLocation::at_path(format!(
237 "nodes.{node_id}.routing"
238 )),
239 });
240 }
241 let to_id = NodeId::new(to.as_str()).map_err(|e| {
242 crate::error::FlowError::InvalidIdentifier {
243 kind: "node",
244 value: to.clone(),
245 detail: e.to_string(),
246 location: crate::error::FlowErrorLocation::at_path(format!(
247 "nodes.{node_id}.routing"
248 )),
249 }
250 })?;
251 if let Some(status) = &route.status {
252 any_status = true;
253 on_status.insert(status.clone(), to_id);
254 } else {
255 default = Some(to_id);
256 }
257 }
258 if any_status {
259 return Ok(Routing::Branch { on_status, default });
260 }
261 if let Some(default) = default {
262 return Ok(Routing::Branch {
263 on_status,
264 default: Some(default),
265 });
266 }
267 }
268
269 Ok(Routing::Custom(raw.clone()))
270}
271
272fn resolve_entry(doc: &FlowDoc) -> Option<String> {
273 if let Some(start) = &doc.start {
274 return Some(start.clone());
275 }
276 if doc.nodes.contains_key("in") {
277 return Some("in".to_string());
278 }
279 doc.nodes.keys().next().cloned()
280}