1#![forbid(unsafe_code)]
5#![allow(clippy::result_large_err)]
6
7pub mod add_step;
8pub mod component_catalog;
9pub mod config_flow;
10pub mod error;
11pub mod flow_bundle;
12pub mod flow_ir;
13pub mod ir;
14pub mod json_output;
15pub mod lint;
16pub mod loader;
17pub mod model;
18pub mod path_safety;
19pub mod registry;
20pub mod resolve;
21pub mod splice;
22pub mod util;
23
24pub use flow_bundle::{
25 ComponentPin, FlowBundle, NodeRef, blake3_hex, canonicalize_json, extract_component_pins,
26 load_and_validate_bundle, load_and_validate_bundle_with_flow,
27};
28pub use json_output::{JsonDiagnostic, LintJsonOutput, lint_to_stdout_json};
29pub use splice::{NEXT_NODE_PLACEHOLDER, splice_node_after};
30
31use crate::{error::Result, model::FlowDoc};
32use greentic_types::{
33 ComponentId, Flow, FlowComponentRef, FlowId, FlowKind, FlowMetadata, InputMapping, Node,
34 NodeId, OutputMapping, Routing, TelemetryHints, flow::FlowHasher,
35};
36use indexmap::IndexMap;
37use serde_json::Value;
38use std::collections::{BTreeMap, BTreeSet};
39use std::path::Path;
40
41pub fn map_flow_type(flow_type: &str) -> Result<FlowKind> {
43 match flow_type {
44 "messaging" => Ok(FlowKind::Messaging),
45 "event" | "events" => Ok(FlowKind::Event),
46 "component-config" => Ok(FlowKind::ComponentConfig),
47 "job" => Ok(FlowKind::Job),
48 "http" => Ok(FlowKind::Http),
49 other => Err(crate::error::FlowError::UnknownFlowType {
50 flow_type: other.to_string(),
51 location: crate::error::FlowErrorLocation::at_path("type"),
52 }),
53 }
54}
55
56pub fn compile_flow(doc: FlowDoc) -> Result<Flow> {
58 let kind = map_flow_type(&doc.flow_type)?;
59 let mut entrypoints = doc.entrypoints.clone();
60 if let Some(entry) = resolve_entry(&doc) {
61 entrypoints
62 .entry("default".to_string())
63 .or_insert_with(|| Value::String(entry));
64 }
65
66 let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
67 for (node_id_str, node_doc) in doc.nodes.iter() {
68 let node_id = NodeId::new(node_id_str.as_str()).map_err(|e| {
69 crate::error::FlowError::InvalidIdentifier {
70 kind: "node",
71 value: node_id_str.clone(),
72 detail: e.to_string(),
73 location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id_str}")),
74 }
75 })?;
76 let component_id = ComponentId::new(node_doc.component.as_str()).map_err(|e| {
77 crate::error::FlowError::InvalidIdentifier {
78 kind: "component",
79 value: node_doc.component.clone(),
80 detail: e.to_string(),
81 location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id_str}")),
82 }
83 })?;
84 let routing = compile_routing(&node_doc.routing, &doc.nodes, node_id_str)?;
85 let telemetry = node_doc
86 .telemetry
87 .as_ref()
88 .map(|t| TelemetryHints {
89 span_name: t.span_name.clone(),
90 attributes: t.attributes.clone(),
91 sampling: t.sampling.clone(),
92 })
93 .unwrap_or_default();
94 let node = Node {
95 id: node_id.clone(),
96 component: FlowComponentRef {
97 id: component_id,
98 pack_alias: node_doc.pack_alias.clone(),
99 operation: node_doc.operation.clone(),
100 },
101 input: InputMapping {
102 mapping: node_doc.payload.clone(),
103 },
104 output: OutputMapping {
105 mapping: node_doc
106 .output
107 .clone()
108 .unwrap_or_else(|| Value::Object(Default::default())),
109 },
110 routing,
111 telemetry,
112 };
113 nodes.insert(node_id, node);
114 }
115
116 let flow_id =
117 FlowId::new(doc.id.as_str()).map_err(|e| crate::error::FlowError::InvalidIdentifier {
118 kind: "flow",
119 value: doc.id.clone(),
120 detail: e.to_string(),
121 location: crate::error::FlowErrorLocation::at_path("id"),
122 })?;
123
124 Ok(Flow {
125 schema_version: "flow-v1".to_string(),
126 id: flow_id,
127 kind,
128 entrypoints,
129 nodes,
130 metadata: FlowMetadata {
131 title: doc.title,
132 description: doc.description,
133 tags: doc.tags.into_iter().collect::<BTreeSet<_>>(),
134 extra: doc.parameters,
135 },
136 })
137}
138
139pub fn compile_ygtc_str(src: &str) -> Result<Flow> {
141 let doc = loader::load_ygtc_from_str(src)?;
142 compile_flow(doc)
143}
144
145pub fn compile_ygtc_file(path: &Path) -> Result<Flow> {
147 let doc = loader::load_ygtc_from_path(path)?;
148 compile_flow(doc)
149}
150
151fn compile_routing(
152 raw: &Value,
153 nodes: &BTreeMap<String, crate::model::NodeDoc>,
154 node_id: &str,
155) -> Result<Routing> {
156 #[derive(serde::Deserialize)]
157 struct RouteDoc {
158 #[serde(default)]
159 to: Option<String>,
160 #[serde(default)]
161 out: Option<bool>,
162 #[serde(default)]
163 status: Option<String>,
164 #[serde(default)]
165 reply: Option<bool>,
166 }
167
168 let routes: Vec<RouteDoc> = if raw.is_null() {
169 Vec::new()
170 } else {
171 serde_json::from_value(raw.clone()).map_err(|e| crate::error::FlowError::Routing {
172 node_id: node_id.to_string(),
173 message: e.to_string(),
174 location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id}.routing")),
175 })?
176 };
177
178 if routes.len() == 1 {
179 let route = &routes[0];
180 let is_out = route.out.unwrap_or(false);
181 if route.reply.unwrap_or(false) {
182 return Ok(Routing::Reply);
183 }
184 if let Some(to) = &route.to {
185 if to == "out" || is_out {
186 return Ok(Routing::End);
187 }
188 if !nodes.contains_key(to) {
189 return Err(crate::error::FlowError::MissingNode {
190 target: to.clone(),
191 node_id: node_id.to_string(),
192 location: crate::error::FlowErrorLocation::at_path(format!(
193 "nodes.{node_id}.routing"
194 )),
195 });
196 }
197 return Ok(Routing::Next {
198 node_id: NodeId::new(to.as_str()).map_err(|e| {
199 crate::error::FlowError::InvalidIdentifier {
200 kind: "node",
201 value: to.clone(),
202 detail: e.to_string(),
203 location: crate::error::FlowErrorLocation::at_path(format!(
204 "nodes.{node_id}.routing"
205 )),
206 }
207 })?,
208 });
209 }
210 if is_out {
211 return Ok(Routing::End);
212 }
213 if route.status.is_some() {
214 return Ok(Routing::Custom(raw.clone()));
216 }
217 }
218
219 if routes.is_empty() {
220 return Ok(Routing::End);
221 }
222
223 if routes.len() >= 2 {
225 use std::collections::BTreeMap;
226 let mut on_status: BTreeMap<String, NodeId> = BTreeMap::new();
227 let mut default: Option<NodeId> = None;
228 let mut any_status = false;
229 for route in &routes {
230 if route.reply.unwrap_or(false) || route.out.unwrap_or(false) {
231 return Ok(Routing::Custom(raw.clone()));
232 }
233 let to = match &route.to {
234 Some(t) => t,
235 None => return Ok(Routing::Custom(raw.clone())),
236 };
237 if !nodes.contains_key(to) {
238 return Err(crate::error::FlowError::MissingNode {
239 target: to.clone(),
240 node_id: node_id.to_string(),
241 location: crate::error::FlowErrorLocation::at_path(format!(
242 "nodes.{node_id}.routing"
243 )),
244 });
245 }
246 let to_id = NodeId::new(to.as_str()).map_err(|e| {
247 crate::error::FlowError::InvalidIdentifier {
248 kind: "node",
249 value: to.clone(),
250 detail: e.to_string(),
251 location: crate::error::FlowErrorLocation::at_path(format!(
252 "nodes.{node_id}.routing"
253 )),
254 }
255 })?;
256 if let Some(status) = &route.status {
257 any_status = true;
258 on_status.insert(status.clone(), to_id);
259 } else {
260 default = Some(to_id);
261 }
262 }
263 if any_status {
264 return Ok(Routing::Branch { on_status, default });
265 }
266 if let Some(default) = default {
267 return Ok(Routing::Branch {
268 on_status,
269 default: Some(default),
270 });
271 }
272 }
273
274 Ok(Routing::Custom(raw.clone()))
275}
276
277fn resolve_entry(doc: &FlowDoc) -> Option<String> {
278 if let Some(start) = &doc.start {
279 return Some(start.clone());
280 }
281 if doc.nodes.contains_key("in") {
282 return Some("in".to_string());
283 }
284 doc.nodes.keys().next().cloned()
285}