1#![forbid(unsafe_code)]
5#![allow(clippy::result_large_err)]
6
7pub mod config_flow;
8pub mod error;
9pub mod flow_bundle;
10pub mod ir;
11pub mod json_output;
12pub mod lint;
13pub mod loader;
14pub mod model;
15pub mod path_safety;
16pub mod registry;
17pub mod resolve;
18pub mod splice;
19pub mod util;
20
21pub use flow_bundle::{
22 ComponentPin, FlowBundle, NodeRef, blake3_hex, canonicalize_json, extract_component_pins,
23 load_and_validate_bundle, load_and_validate_bundle_with_flow,
24};
25pub use json_output::{JsonDiagnostic, LintJsonOutput, lint_to_stdout_json};
26pub use splice::{NEXT_NODE_PLACEHOLDER, splice_node_after};
27
28use crate::{error::Result, model::FlowDoc};
29use greentic_types::{
30 ComponentId, Flow, FlowComponentRef, FlowId, FlowKind, FlowMetadata, InputMapping, Node,
31 NodeId, OutputMapping, Routing, TelemetryHints, flow::FlowHasher,
32};
33use indexmap::IndexMap;
34use serde_json::Value;
35use std::collections::{BTreeMap, BTreeSet};
36use std::path::Path;
37
38pub fn map_flow_type(flow_type: &str) -> Result<FlowKind> {
40 match flow_type {
41 "messaging" => Ok(FlowKind::Messaging),
42 "event" | "events" => Ok(FlowKind::Event),
43 "component-config" => Ok(FlowKind::ComponentConfig),
44 "job" => Ok(FlowKind::Job),
45 "http" => Ok(FlowKind::Http),
46 other => Err(crate::error::FlowError::UnknownFlowType {
47 flow_type: other.to_string(),
48 location: crate::error::FlowErrorLocation::at_path("type"),
49 }),
50 }
51}
52
53pub fn compile_flow(doc: FlowDoc) -> Result<Flow> {
55 let kind = map_flow_type(&doc.flow_type)?;
56 let mut entrypoints = doc.entrypoints.clone();
57 if let Some(entry) = resolve_entry(&doc) {
58 entrypoints
59 .entry("default".to_string())
60 .or_insert_with(|| Value::String(entry));
61 }
62
63 let mut nodes: IndexMap<NodeId, Node, FlowHasher> = IndexMap::default();
64 for (node_id_str, node_doc) in doc.nodes.iter() {
65 let node_id = NodeId::new(node_id_str.as_str()).map_err(|e| {
66 crate::error::FlowError::InvalidIdentifier {
67 kind: "node",
68 value: node_id_str.clone(),
69 detail: e.to_string(),
70 location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id_str}")),
71 }
72 })?;
73 let component_id = ComponentId::new(node_doc.component.as_str()).map_err(|e| {
74 crate::error::FlowError::InvalidIdentifier {
75 kind: "component",
76 value: node_doc.component.clone(),
77 detail: e.to_string(),
78 location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id_str}")),
79 }
80 })?;
81 let routing = compile_routing(&node_doc.routing, &doc.nodes, node_id_str)?;
82 let telemetry = node_doc
83 .telemetry
84 .as_ref()
85 .map(|t| TelemetryHints {
86 span_name: t.span_name.clone(),
87 attributes: t.attributes.clone(),
88 sampling: t.sampling.clone(),
89 })
90 .unwrap_or_default();
91 let node = Node {
92 id: node_id.clone(),
93 component: FlowComponentRef {
94 id: component_id,
95 pack_alias: node_doc.pack_alias.clone(),
96 operation: node_doc.operation.clone(),
97 },
98 input: InputMapping {
99 mapping: node_doc.payload.clone(),
100 },
101 output: OutputMapping {
102 mapping: node_doc
103 .output
104 .clone()
105 .unwrap_or_else(|| Value::Object(Default::default())),
106 },
107 routing,
108 telemetry,
109 };
110 nodes.insert(node_id, node);
111 }
112
113 let flow_id =
114 FlowId::new(doc.id.as_str()).map_err(|e| crate::error::FlowError::InvalidIdentifier {
115 kind: "flow",
116 value: doc.id.clone(),
117 detail: e.to_string(),
118 location: crate::error::FlowErrorLocation::at_path("id"),
119 })?;
120
121 Ok(Flow {
122 schema_version: "flow-v1".to_string(),
123 id: flow_id,
124 kind,
125 entrypoints,
126 nodes,
127 metadata: FlowMetadata {
128 title: doc.title,
129 description: doc.description,
130 tags: doc.tags.into_iter().collect::<BTreeSet<_>>(),
131 extra: doc.parameters,
132 },
133 })
134}
135
136pub fn compile_ygtc_str(src: &str) -> Result<Flow> {
138 let doc = loader::load_ygtc_from_str(src)?;
139 compile_flow(doc)
140}
141
142pub fn compile_ygtc_file(path: &Path) -> Result<Flow> {
144 let doc = loader::load_ygtc_from_path(path)?;
145 compile_flow(doc)
146}
147
148fn compile_routing(
149 raw: &Value,
150 nodes: &BTreeMap<String, crate::model::NodeDoc>,
151 node_id: &str,
152) -> Result<Routing> {
153 #[derive(serde::Deserialize)]
154 struct RouteDoc {
155 #[serde(default)]
156 to: Option<String>,
157 #[serde(default)]
158 out: Option<bool>,
159 #[serde(default)]
160 status: Option<String>,
161 #[serde(default)]
162 reply: Option<bool>,
163 }
164
165 let routes: Vec<RouteDoc> = if raw.is_null() {
166 Vec::new()
167 } else {
168 serde_json::from_value(raw.clone()).map_err(|e| crate::error::FlowError::Routing {
169 node_id: node_id.to_string(),
170 message: e.to_string(),
171 location: crate::error::FlowErrorLocation::at_path(format!("nodes.{node_id}.routing")),
172 })?
173 };
174
175 if routes.len() == 1 {
176 let route = &routes[0];
177 let is_out = route.out.unwrap_or(false);
178 if route.reply.unwrap_or(false) {
179 return Ok(Routing::Reply);
180 }
181 if let Some(to) = &route.to {
182 if to == "out" || is_out {
183 return Ok(Routing::End);
184 }
185 if !nodes.contains_key(to) {
186 return Err(crate::error::FlowError::MissingNode {
187 target: to.clone(),
188 node_id: node_id.to_string(),
189 location: crate::error::FlowErrorLocation::at_path(format!(
190 "nodes.{node_id}.routing"
191 )),
192 });
193 }
194 return Ok(Routing::Next {
195 node_id: NodeId::new(to.as_str()).map_err(|e| {
196 crate::error::FlowError::InvalidIdentifier {
197 kind: "node",
198 value: to.clone(),
199 detail: e.to_string(),
200 location: crate::error::FlowErrorLocation::at_path(format!(
201 "nodes.{node_id}.routing"
202 )),
203 }
204 })?,
205 });
206 }
207 if is_out {
208 return Ok(Routing::End);
209 }
210 if route.status.is_some() {
211 return Ok(Routing::Custom(raw.clone()));
213 }
214 }
215
216 if routes.is_empty() {
217 return Ok(Routing::End);
218 }
219
220 if routes.len() >= 2 {
222 use std::collections::BTreeMap;
223 let mut on_status: BTreeMap<String, NodeId> = BTreeMap::new();
224 let mut default: Option<NodeId> = None;
225 let mut any_status = false;
226 for route in &routes {
227 if route.reply.unwrap_or(false) || route.out.unwrap_or(false) {
228 return Ok(Routing::Custom(raw.clone()));
229 }
230 let to = match &route.to {
231 Some(t) => t,
232 None => return Ok(Routing::Custom(raw.clone())),
233 };
234 if !nodes.contains_key(to) {
235 return Err(crate::error::FlowError::MissingNode {
236 target: to.clone(),
237 node_id: node_id.to_string(),
238 location: crate::error::FlowErrorLocation::at_path(format!(
239 "nodes.{node_id}.routing"
240 )),
241 });
242 }
243 let to_id = NodeId::new(to.as_str()).map_err(|e| {
244 crate::error::FlowError::InvalidIdentifier {
245 kind: "node",
246 value: to.clone(),
247 detail: e.to_string(),
248 location: crate::error::FlowErrorLocation::at_path(format!(
249 "nodes.{node_id}.routing"
250 )),
251 }
252 })?;
253 if let Some(status) = &route.status {
254 any_status = true;
255 on_status.insert(status.clone(), to_id);
256 } else {
257 default = Some(to_id);
258 }
259 }
260 if any_status {
261 return Ok(Routing::Branch { on_status, default });
262 }
263 if let Some(default) = default {
264 return Ok(Routing::Branch {
265 on_status,
266 default: Some(default),
267 });
268 }
269 }
270
271 Ok(Routing::Custom(raw.clone()))
272}
273
274fn resolve_entry(doc: &FlowDoc) -> Option<String> {
275 if let Some(start) = &doc.start {
276 return Some(start.clone());
277 }
278 if doc.nodes.contains_key("in") {
279 return Some("in".to_string());
280 }
281 doc.nodes.keys().next().cloned()
282}